diff --git a/.github/workflows/manual_release.yml b/.github/workflows/manual_release.yml index 5f54e91..8165d39 100644 --- a/.github/workflows/manual_release.yml +++ b/.github/workflows/manual_release.yml @@ -4,11 +4,11 @@ on: workflow_dispatch: inputs: branch: - description: "Branch to build and release" + description: 'Branch to build and release' required: true - default: "master" + default: 'master' tag: - description: "Tag to build and release" + description: 'Tag to build and release' required: true env: @@ -17,7 +17,7 @@ env: jobs: build-and-push: - runs-on: "ubuntu-latest" + runs-on: 'ubuntu-latest' timeout-minutes: 20 steps: - name: Checkout diff --git a/.husky/pre-commit b/.husky/pre-commit index f988b4e..a717a36 100755 --- a/.husky/pre-commit +++ b/.husky/pre-commit @@ -3,3 +3,6 @@ npm run prettier:lint npm run eslint:lint +npm run generate-envs + +git add docs/env-vars.md diff --git a/docs/env-vars.md b/docs/env-vars.md index 1458853..9458df0 100644 --- a/docs/env-vars.md +++ b/docs/env-vars.md @@ -4,36 +4,41 @@ _Object containing the following properties:_ -| Property | Description | Type | Default | -| :----------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :--------------------- | :------------------------------------------------------- | -| `EXCLUDED_NODES` | NodeRed nodes that are going to be excluded from SmartFlows. If any node type of the SmartFlow equals to one of the EXCLUDED_NODES it will prevent SmartFlow from being installed. | `string` | `'file,file in,watch,exec'` | -| `TARGET_SOLUTION_NAMESPACES` | If set it will only install solutions that namespaces are specified in this env. variable. Comma-separated. | `string` | | -| `RETRY_WORKER_CHECKS` | If it's enabled it will indefinitely await for worker to pass all checks otherwise it will kill process. Useful e.g if worker account is not yet assigned to operator at the moment of configuration. | `boolean` (_nullable_) | `true` | -| `ENABLE_HEALTH_API` | If it's enabled it will enable health check routes. | `boolean` (_nullable_) | `true` | -| **`PALLET_RPC_URL`** (\*) | Read EWX Parachain URL | `string` (_url_) | | -| **`VOTING_RPC_URL`** (\*) | Write EWX Parachain URL | `string` (_url_) | | -| **`VOTING_WORKER_SEED`** (\*) | Seed of the worker (not operator) | `string` | | -| `PORT` | Port number of NodeRed Server. | `number` (_>0_) | `8000` | -| `HOST` | Hostname of NodeRed Server. | `string` | `'localhost'` | -| `RED_ENABLE_UI` | Should enable NodeRed UI. | `'true' \| 'false'` | `'false'` | -| `RED_DIRECTORY` | Storage of NodeRed flows. | `string` | `'./node-red-data'` | -| `SQLITE_BASE_PATH` | Base SQLite path. | `string` | `'./sqlite'` | -| `IPFS_API_KEY` | IPFS API Key | `string` (_nullable_) | `null` | -| `IPFS_SECRET_KEY` | IPFS Secret Key | `string` (_nullable_) | `null` | -| `IPFS_URL` | IPFS BASE Url | `string` | `'https://workers-registry.energywebx.com'` | -| `IPFS_CONTEXT_PATH` | IPFS Context Path | `string` | `'/ipfs/'` | -| `IPFS_USER_AGENT_VALUE` | Default user agent that is going to be used for public IPFS queries. | `string` | `'ewx-worker-node-server'` | -| `SOLUTION_QUEUE_PROCESS_DELAY` | How often should refresh EWX Solutions information from chain (in miliseconds). | `number` (_nullable_) | `20000` | -| `LOCAL_SOLUTIONS_PATH` | Path to locally hosted NodeRed solution flow files to be used when installing solutions using "local" prefix within WorkLogic field. | `string` | | -| `SS58_FORMAT` | SS58 Key Format. | `number` (_>0_) | `42` | -| `PRETTY_PRINT` | Should pretty print logs. If you plan to use Grafana or any other log tooling it's recommended to set it to false. | `'true' \| 'false'` | `'false'` | -| `HEARTBEAT_PATH` | Path to the heartbeat file used for monitoring. | `string` | `'heartbeat_monitor.txt'` | -| `HEARTBEAT_INTERVAL` | Interval (in ms) at which the heartbeat process updates the file. | `number` (_>0_) | `5000` | -| `HEARTBEAT_PRINT_SUCCESS_LOG` | Should print successful logs. | `'true' \| 'false'` | `'true'` | -| `PALLET_AUTH_SERVER_LOGIN_URL` | Pallet Auth Server Url used for authentication to Workers Registry | `string` (_url_) | `'https://auth.energywebx.com/api/auth/login'` | -| `PALLET_AUTH_SERVER_DOMAIN` | Pallet Auth Server domain | `string` | `'default'` | -| `WORKER_REGISTRY_URL` | | `string` (_url_) | `'https://workers-registry.energywebx.com'` | -| `BASE_URLS` | Base URLs of EWX resources | `string` (_url_) | `'https://marketplace-cdn.energyweb.org/base_urls.json'` | -| `BUILD_METADATA_PATH` | Path to build metadata file | `string` | `'./build.json'` | +| Property | Description | Type | Default | +| :----------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :-------------------------- | :------------------------------------------------------- | +| `EXCLUDED_NODES` | NodeRed nodes that are going to be excluded from SmartFlows. If any node type of the SmartFlow equals to one of the EXCLUDED_NODES it will prevent SmartFlow from being installed. | `string` | `'file,file in,watch,exec'` | +| `TARGET_SOLUTION_NAMESPACES` | If set it will only install solutions that namespaces are specified in this env. variable. Comma-separated. | `string` | | +| `RETRY_WORKER_CHECKS` | If it's enabled it will indefinitely await for worker to pass all checks otherwise it will kill process. Useful e.g if worker account is not yet assigned to operator at the moment of configuration. | `boolean` (_nullable_) | `true` | +| `ENABLE_HEALTH_API` | If it's enabled it will enable health check routes. | `boolean` (_nullable_) | `true` | +| `PALLET_RPC_URL` | Read EWX Parachain URL. If provided, it will overwrite value provided from BASE_URLS. | `string` (_url_) | | +| **`VOTING_RPC_URL`** (\*) | Write EWX Parachain URL | `string` (_url_) | | +| **`VOTING_WORKER_SEED`** (\*) | Seed of the worker (not operator) | `string` | | +| `PORT` | Port number of NodeRed Server. | `number` (_>0_) | `8000` | +| `HOST` | Hostname of NodeRed Server. | `string` | `'localhost'` | +| `RED_ENABLE_UI` | Should enable NodeRed UI. | `'true' \| 'false'` | `'false'` | +| `RED_DIRECTORY` | Storage of NodeRed flows. | `string` | `'./node-red-data'` | +| `SQLITE_BASE_PATH` | Base SQLite path. | `string` | `'./sqlite'` | +| `IPFS_API_KEY` | IPFS API Key | `string` (_nullable_) | `null` | +| `IPFS_SECRET_KEY` | IPFS Secret Key | `string` (_nullable_) | `null` | +| `IPFS_URL` | IPFS BASE Url | `string` | `'https://workers-registry.energywebx.com'` | +| `IPFS_CONTEXT_PATH` | IPFS Context Path | `string` | `'/ipfs/'` | +| `IPFS_USER_AGENT_VALUE` | Default user agent that is going to be used for public IPFS queries. | `string` | `'ewx-worker-node-server'` | +| `SOLUTION_QUEUE_PROCESS_DELAY` | How often should refresh EWX Solutions information from chain (in miliseconds). | `number` (_nullable_) | `20000` | +| `LOCAL_SOLUTIONS_PATH` | Path to locally hosted NodeRed solution flow files to be used when installing solutions using "local" prefix within WorkLogic field. | `string` | | +| `SS58_FORMAT` | SS58 Key Format. | `number` (_>0_) | `42` | +| `PRETTY_PRINT` | Should pretty print logs. If you plan to use Grafana or any other log tooling it's recommended to set it to false. | `'true' \| 'false'` | `'false'` | +| `LOG_FILE_PATH` | Full path to log file (e.g., /var/log/app.log or ./logs/app.log). If not provided, file logging is disabled. | `string` | | +| `LOG_RETENTION_DAYS` | Number of days to keep rotated logs | `number` (_>0_) | | +| `HEARTBEAT_PATH` | Path to the heartbeat file used for monitoring. | `string` | `'heartbeat_monitor.txt'` | +| `HEARTBEAT_INTERVAL` | Interval (in ms) at which the heartbeat process updates the file. | `number` (_>0_) | `5000` | +| `HEARTBEAT_PRINT_SUCCESS_LOG` | Should print successful logs. | `'true' \| 'false'` | `'true'` | +| `PALLET_AUTH_SERVER_LOGIN_URL` | Pallet Auth Server Url used for authentication to Workers Registry. If provided, it will overwrite value provided from BASE_URLS. | `string` (_url_) | | +| `PALLET_AUTH_SERVER_DOMAIN` | Pallet Auth Server domain | `string` | `'default'` | +| `WORKER_REGISTRY_URL` | Url of Workers Registry that stores information about Worker Location. If provided, it will overwrite value provided from BASE_URLS. | `string` (_url_) | | +| `BASE_URLS` | Base URLs of EWX resources | `string` (_url_) | `'https://marketplace-cdn.energyweb.org/base_urls.json'` | +| `BUILD_METADATA_PATH` | Path to build metadata file | `string` | `'./build.json'` | +| `SHUTDOWN_TIMEOUT_MS` | Timeout in milliseconds for graceful shutdown (default: 30000) | `number` (_>0_) | `30000` | +| `ADMIN_SERVER_PORT` | Port number for admin server (default: 3003) | `number` (_>0_) | `3003` | +| `ADMIN_API_KEY` | API key for admin endpoints authentication. Must be at least 32 characters. If not set, admin endpoints will be accessible without authentication. | `string` (_min length: 32_) | | _(\*) Required._ diff --git a/package-lock.json b/package-lock.json index 6cd667b..a5026d6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,21 +13,27 @@ "@energyweb/node-red-contrib-green-proof-worker": "2.5.1", "@polkadot/api": "14.0.1", "axios": "1.7.9", + "cache-manager": "7.2.5", "dotenv": "16.4.7", "express": "4.21.2", "express-async-handler": "1.2.0", "fastq": "1.19.0", + "jsonwebtoken": "9.0.2", "node-red": "4.0.8", + "object-hash": "3.0.0", "pino": "9.6.0", "pino-pretty": "13.0.0", + "pino-roll": "4.0.0", "promise-retry": "2.0.1", "zod": "3.24.1" }, "devDependencies": { "@commitlint/cli": "19.7.1", "@commitlint/config-conventional": "19.7.1", + "@types/jsonwebtoken": "9.0.10", "@types/node": "18.19.75", "@types/node-red": "1.3.5", + "@types/object-hash": "3.0.5", "@types/promise-retry": "1.1.6", "@typescript-eslint/eslint-plugin": "6.21.0", "eslint": "8.57.1", @@ -41,6 +47,10 @@ "prettier": "3.5.0", "typescript": "5.7.3", "zod2md": "0.1.4" + }, + "engines": { + "node": ">=22.21.1", + "npm": ">=10.9.0" } }, "node_modules/@babel/code-frame": { @@ -77,6 +87,25 @@ "node": ">=6.9.0" } }, + "node_modules/@cacheable/utils": { + "version": "2.3.2", + "resolved": "https://registry.npmjs.org/@cacheable/utils/-/utils-2.3.2.tgz", + "integrity": "sha512-8kGE2P+HjfY8FglaOiW+y8qxcaQAfAhVML+i66XJR3YX5FtyDqn6Txctr3K2FrbxLKixRRYYBWMbuGciOhYNDg==", + "license": "MIT", + "dependencies": { + "hashery": "^1.2.0", + "keyv": "^5.5.4" + } + }, + "node_modules/@cacheable/utils/node_modules/keyv": { + "version": "5.5.5", + "resolved": "https://registry.npmjs.org/keyv/-/keyv-5.5.5.tgz", + "integrity": "sha512-FA5LmZVF1VziNc0bIdCSA1IoSVnDCqE8HJIZZv2/W8YmoAM50+tnUgJR/gQZwEeIMleuIOnRnHA/UaZRNeV4iQ==", + "license": "MIT", + "dependencies": { + "@keyv/serialize": "^1.1.1" + } + }, "node_modules/@commitlint/cli": { "version": "19.7.1", "resolved": "https://registry.npmjs.org/@commitlint/cli/-/cli-19.7.1.tgz", @@ -1954,6 +1983,12 @@ "node": ">=18.0.0" } }, + "node_modules/@keyv/serialize": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/@keyv/serialize/-/serialize-1.1.1.tgz", + "integrity": "sha512-dXn3FZhPv0US+7dtJsIi2R+c7qWYiReoEh5zUntWCf4oSpMNib8FDhSoed6m3QyZdx5hK7iLFkYk3rNxwt8vTA==", + "license": "MIT" + }, "node_modules/@mapbox/node-pre-gyp": { "version": "1.0.11", "resolved": "https://registry.npmjs.org/@mapbox/node-pre-gyp/-/node-pre-gyp-1.0.11.tgz", @@ -3344,12 +3379,30 @@ "integrity": "sha512-dRLjCWHYg4oaA77cxO64oO+7JwCwnIzkZPdrrC71jQmQtlhM556pwKo5bUzqvZndkVbeFLIIi+9TC40JNF5hNQ==", "dev": true }, + "node_modules/@types/jsonwebtoken": { + "version": "9.0.10", + "resolved": "https://registry.npmjs.org/@types/jsonwebtoken/-/jsonwebtoken-9.0.10.tgz", + "integrity": "sha512-asx5hIG9Qmf/1oStypjanR7iKTv0gXQ1Ov/jfrX6kS/EO0OFni8orbmGCn0672NHR3kXHwpAwR+B368ZGN/2rA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/ms": "*", + "@types/node": "*" + } + }, "node_modules/@types/mime": { "version": "1.3.5", "resolved": "https://registry.npmjs.org/@types/mime/-/mime-1.3.5.tgz", "integrity": "sha512-/pyBZWSLD2n0dcHE3hq8s8ZvcETHtEuF+3E7XVt0Ig2nvsVQXdghHVcEkIWjy9A0wKfTn97a/PSDYohKIlnP/w==", "dev": true }, + "node_modules/@types/ms": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/@types/ms/-/ms-2.1.0.tgz", + "integrity": "sha512-GsCCIZDE/p3i96vtEqx+7dBUGXrc7zeSK3wwPHIaRThS+9OhWIXRqzs4d6k1SVU8g91DrNRWxWUGhp5KXQb2VA==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/node": { "version": "18.19.75", "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.75.tgz", @@ -3430,6 +3483,13 @@ "jsonata": "2.0.5" } }, + "node_modules/@types/object-hash": { + "version": "3.0.5", + "resolved": "https://registry.npmjs.org/@types/object-hash/-/object-hash-3.0.5.tgz", + "integrity": "sha512-WFGeSazfL5BWbEh5ACaAIs5RT6sbVIwBs1rgHUp+kZzX/gub41LEEYWTWbYnE/sKb7hDdPEsGa1Vmcaay2fS5g==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/passport": { "version": "1.0.17", "resolved": "https://registry.npmjs.org/@types/passport/-/passport-1.0.17.tgz", @@ -4427,6 +4487,12 @@ "ieee754": "^1.2.1" } }, + "node_modules/buffer-equal-constant-time": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz", + "integrity": "sha512-zRpUiDwd/xk6ADqPMATG8vc9VPrkck7T07OIx0gnjmJAnHnTVXNQG3vfvWNuiZIkwu9KrKdA1iJKfsfTVxE6NA==", + "license": "BSD-3-Clause" + }, "node_modules/buffer-from": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/buffer-from/-/buffer-from-1.1.2.tgz", @@ -4487,6 +4553,25 @@ "node": ">= 0.8" } }, + "node_modules/cache-manager": { + "version": "7.2.5", + "resolved": "https://registry.npmjs.org/cache-manager/-/cache-manager-7.2.5.tgz", + "integrity": "sha512-Y5LF7olTrcKJn1NoKiWPOvjEiO5DfDVPxqZHETCRMaliC60KBNb4Ge/vEYep5TyaqpXvnpnPPo8zauCe6UzZwA==", + "license": "MIT", + "dependencies": { + "@cacheable/utils": "^2.3.0", + "keyv": "^5.5.4" + } + }, + "node_modules/cache-manager/node_modules/keyv": { + "version": "5.5.5", + "resolved": "https://registry.npmjs.org/keyv/-/keyv-5.5.5.tgz", + "integrity": "sha512-FA5LmZVF1VziNc0bIdCSA1IoSVnDCqE8HJIZZv2/W8YmoAM50+tnUgJR/gQZwEeIMleuIOnRnHA/UaZRNeV4iQ==", + "license": "MIT", + "dependencies": { + "@keyv/serialize": "^1.1.1" + } + }, "node_modules/cacheable-lookup": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/cacheable-lookup/-/cacheable-lookup-7.0.0.tgz", @@ -5216,6 +5301,16 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/date-fns": { + "version": "4.1.0", + "resolved": "https://registry.npmjs.org/date-fns/-/date-fns-4.1.0.tgz", + "integrity": "sha512-Ukq0owbQXxa/U3EGtsdVBkR1w7KOQ5gIBqdH2hkvknzZPYvBxb/aa6E8L7tmjFtkwZBu3UXBbjIgPo/Ez4xaNg==", + "license": "MIT", + "funding": { + "type": "github", + "url": "https://github.com/sponsors/kossnocorp" + } + }, "node_modules/dateformat": { "version": "4.6.3", "resolved": "https://registry.npmjs.org/dateformat/-/dateformat-4.6.3.tgz", @@ -5510,6 +5605,15 @@ "safer-buffer": "^2.1.0" } }, + "node_modules/ecdsa-sig-formatter": { + "version": "1.0.11", + "resolved": "https://registry.npmjs.org/ecdsa-sig-formatter/-/ecdsa-sig-formatter-1.0.11.tgz", + "integrity": "sha512-nagl3RYrbNv6kQkeJIpt6NJZy8twLB/2vtz6yN9Z4vRKHN4/QZJIEbqohALSgwKdnksuY3k5Addp5lg8sVoVcQ==", + "license": "Apache-2.0", + "dependencies": { + "safe-buffer": "^5.0.1" + } + }, "node_modules/ee-first": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/ee-first/-/ee-first-1.1.1.tgz", @@ -7410,6 +7514,18 @@ "resolved": "https://registry.npmjs.org/hash-sum/-/hash-sum-2.0.0.tgz", "integrity": "sha512-WdZTbAByD+pHfl/g9QSsBIIwy8IT+EsPiKDs0KNX+zSHhdDLFKdZu0BQHljvO+0QI/BasbMSUa8wYNCZTvhslg==" }, + "node_modules/hashery": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/hashery/-/hashery-1.3.0.tgz", + "integrity": "sha512-fWltioiy5zsSAs9ouEnvhsVJeAXRybGCNNv0lvzpzNOSDbULXRy7ivFWwCCv4I5Am6kSo75hmbsCduOoc2/K4w==", + "license": "MIT", + "dependencies": { + "hookified": "^1.13.0" + }, + "engines": { + "node": ">=20" + } + }, "node_modules/hasown": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/hasown/-/hasown-2.0.2.tgz", @@ -7426,6 +7542,12 @@ "resolved": "https://registry.npmjs.org/help-me/-/help-me-5.0.0.tgz", "integrity": "sha512-7xgomUX6ADmcYzFik0HzAxh/73YlKR9bmFzf51CZwR+b6YtzU2m0u49hQCqV6SvlqIqsaxovfwdvbnsw3b/zpg==" }, + "node_modules/hookified": { + "version": "1.13.0", + "resolved": "https://registry.npmjs.org/hookified/-/hookified-1.13.0.tgz", + "integrity": "sha512-6sPYUY8olshgM/1LDNW4QZQN0IqgKhtl/1C8koNZBJrKLBk3AZl6chQtNwpNztvfiApHMEwMHek5rv993PRbWw==", + "license": "MIT" + }, "node_modules/hpagent": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/hpagent/-/hpagent-1.2.0.tgz", @@ -8297,6 +8419,34 @@ "node": "*" } }, + "node_modules/jsonwebtoken": { + "version": "9.0.2", + "resolved": "https://registry.npmjs.org/jsonwebtoken/-/jsonwebtoken-9.0.2.tgz", + "integrity": "sha512-PRp66vJ865SSqOlgqS8hujT5U4AOgMfhrwYIuIhfKaoSCZcirrmASQr8CX7cUg+RMih+hgznrjp99o+W4pJLHQ==", + "license": "MIT", + "dependencies": { + "jws": "^3.2.2", + "lodash.includes": "^4.3.0", + "lodash.isboolean": "^3.0.3", + "lodash.isinteger": "^4.0.4", + "lodash.isnumber": "^3.0.3", + "lodash.isplainobject": "^4.0.6", + "lodash.isstring": "^4.0.1", + "lodash.once": "^4.0.0", + "ms": "^2.1.1", + "semver": "^7.5.4" + }, + "engines": { + "node": ">=12", + "npm": ">=6" + } + }, + "node_modules/jsonwebtoken/node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", + "license": "MIT" + }, "node_modules/jsprim": { "version": "1.4.2", "resolved": "https://registry.npmjs.org/jsprim/-/jsprim-1.4.2.tgz", @@ -8311,6 +8461,27 @@ "node": ">=0.6.0" } }, + "node_modules/jwa": { + "version": "1.4.2", + "resolved": "https://registry.npmjs.org/jwa/-/jwa-1.4.2.tgz", + "integrity": "sha512-eeH5JO+21J78qMvTIDdBXidBd6nG2kZjg5Ohz/1fpa28Z4CcsWUzJ1ZZyFq/3z3N17aZy+ZuBoHljASbL1WfOw==", + "license": "MIT", + "dependencies": { + "buffer-equal-constant-time": "^1.0.1", + "ecdsa-sig-formatter": "1.0.11", + "safe-buffer": "^5.0.1" + } + }, + "node_modules/jws": { + "version": "3.2.2", + "resolved": "https://registry.npmjs.org/jws/-/jws-3.2.2.tgz", + "integrity": "sha512-YHlZCB6lMTllWDtSPHz/ZXTsi8S00usEV6v1tjq8tOUZzw7DpSDWVXjXDre6ed1w/pd495ODpHZYSdkRTsa0HA==", + "license": "MIT", + "dependencies": { + "jwa": "^1.4.1", + "safe-buffer": "^5.0.1" + } + }, "node_modules/kafkajs": { "version": "2.2.4", "resolved": "https://registry.npmjs.org/kafkajs/-/kafkajs-2.2.4.tgz", @@ -8561,11 +8732,40 @@ "resolved": "https://registry.npmjs.org/lodash.clonedeep/-/lodash.clonedeep-4.5.0.tgz", "integrity": "sha512-H5ZhCF25riFd9uB5UCkVKo61m3S/xZk1x4wA6yp/L3RFP6Z/eHH1ymQcGLo7J3GMPfm0V/7m1tryHuGVxpqEBQ==" }, + "node_modules/lodash.includes": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz", + "integrity": "sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w==", + "license": "MIT" + }, + "node_modules/lodash.isboolean": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz", + "integrity": "sha512-Bz5mupy2SVbPHURB98VAcw+aHh4vRV5IPNhILUCsOzRmsTmSQ17jIuqopAentWoehktxGd9e/hbIXq980/1QJg==", + "license": "MIT" + }, + "node_modules/lodash.isinteger": { + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/lodash.isinteger/-/lodash.isinteger-4.0.4.tgz", + "integrity": "sha512-DBwtEWN2caHQ9/imiNeEA5ys1JoRtRfY3d7V9wkqtbycnAmTvRRmbHKDV4a0EYc678/dia0jrte4tjYwVBaZUA==", + "license": "MIT" + }, + "node_modules/lodash.isnumber": { + "version": "3.0.3", + "resolved": "https://registry.npmjs.org/lodash.isnumber/-/lodash.isnumber-3.0.3.tgz", + "integrity": "sha512-QYqzpfwO3/CWf3XP+Z+tkQsfaLL/EnUlXWVkIk5FUPc4sBdTehEqZONuyRt2P67PXAk+NXmTBcc97zw9t1FQrw==", + "license": "MIT" + }, "node_modules/lodash.isplainobject": { "version": "4.0.6", "resolved": "https://registry.npmjs.org/lodash.isplainobject/-/lodash.isplainobject-4.0.6.tgz", - "integrity": "sha512-oSXzaWypCMHkPC3NvBEaPHf0KsA5mvPrOPgQWDsbg8n7orZ290M0BmC/jgRZ4vcJ6DTAhjrsSYgdsW/F+MFOBA==", - "dev": true + "integrity": "sha512-oSXzaWypCMHkPC3NvBEaPHf0KsA5mvPrOPgQWDsbg8n7orZ290M0BmC/jgRZ4vcJ6DTAhjrsSYgdsW/F+MFOBA==" + }, + "node_modules/lodash.isstring": { + "version": "4.0.1", + "resolved": "https://registry.npmjs.org/lodash.isstring/-/lodash.isstring-4.0.1.tgz", + "integrity": "sha512-0wJxfxH1wgO3GrbuP+dTTk7op+6L41QCXbGINEmD+ny/G/eCqGzxyCsh7159S+mgDDcoarnBw6PC1PS5+wUGgw==", + "license": "MIT" }, "node_modules/lodash.kebabcase": { "version": "4.1.1", @@ -8585,6 +8785,12 @@ "integrity": "sha512-GK3g5RPZWTRSeLSpgP8Xhra+pnjBC56q9FZYe1d5RN3TJ35dbkGy3YqBSMbyCrlbi+CM9Z3Jk5yTL7RCsqboyQ==", "dev": true }, + "node_modules/lodash.once": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/lodash.once/-/lodash.once-4.1.1.tgz", + "integrity": "sha512-Sb487aTOCr9drQVL8pIxOzVhafOjZN9UU54hiN8PU3uAiSV7lx1yYNpbNmex2PK6dSJoNTSJUUswT651yww3Mg==", + "license": "MIT" + }, "node_modules/lodash.snakecase": { "version": "4.1.1", "resolved": "https://registry.npmjs.org/lodash.snakecase/-/lodash.snakecase-4.1.1.tgz", @@ -9516,6 +9722,15 @@ "node": ">=0.10.0" } }, + "node_modules/object-hash": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/object-hash/-/object-hash-3.0.0.tgz", + "integrity": "sha512-RSn9F68PjH9HqtltsSnqYC1XXoWe9Bju5+213R98cNGttag9q9yAOTzdbsqvIa7aNm5WffBZFpWYr2aWrklWAw==", + "license": "MIT", + "engines": { + "node": ">= 6" + } + }, "node_modules/object-inspect": { "version": "1.13.4", "resolved": "https://registry.npmjs.org/object-inspect/-/object-inspect-1.13.4.tgz", @@ -9988,6 +10203,16 @@ "pino-pretty": "bin.js" } }, + "node_modules/pino-roll": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/pino-roll/-/pino-roll-4.0.0.tgz", + "integrity": "sha512-axI1aQaIxXdw1F4OFFli1EDxIrdYNGLowkw/ZoZogX8oCSLHUghzwVVXUS8U+xD/Savwa5IXpiXmsSGKFX/7Sg==", + "license": "MIT", + "dependencies": { + "date-fns": "^4.1.0", + "sonic-boom": "^4.0.1" + } + }, "node_modules/pino-std-serializers": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/pino-std-serializers/-/pino-std-serializers-7.0.0.tgz", diff --git a/package.json b/package.json index 1350eb9..bd0ef23 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,11 @@ "version": "1.0.0", "description": "", "main": "dist/main.js", + "engineStrict": true, + "engines": { + "node": ">=22.21.1", + "npm": ">=10.9.0" + }, "scripts": { "build": "tsc", "build:watch": "tsc --watch", @@ -21,21 +26,27 @@ "@energyweb/node-red-contrib-green-proof-worker": "2.5.1", "@polkadot/api": "14.0.1", "axios": "1.7.9", + "cache-manager": "7.2.5", "dotenv": "16.4.7", "express": "4.21.2", "express-async-handler": "1.2.0", "fastq": "1.19.0", + "jsonwebtoken": "9.0.2", "node-red": "4.0.8", + "object-hash": "3.0.0", "pino": "9.6.0", "pino-pretty": "13.0.0", + "pino-roll": "4.0.0", "promise-retry": "2.0.1", "zod": "3.24.1" }, "devDependencies": { "@commitlint/cli": "19.7.1", "@commitlint/config-conventional": "19.7.1", + "@types/jsonwebtoken": "9.0.10", "@types/node": "18.19.75", "@types/node-red": "1.3.5", + "@types/object-hash": "3.0.5", "@types/promise-retry": "1.1.6", "@typescript-eslint/eslint-plugin": "6.21.0", "eslint": "8.57.1", diff --git a/src/auth/login.ts b/src/auth/login.ts index f017489..7a10ad3 100644 --- a/src/auth/login.ts +++ b/src/auth/login.ts @@ -1,12 +1,97 @@ import { type KeyringPair } from '@polkadot/keyring/types'; import axios from 'axios'; import { MAIN_CONFIG } from '../config'; -import { createLogger } from '../util'; import { AccountType, prepareSignInPayload, type SignInDto } from './sign-in-payload'; +import jwt from 'jsonwebtoken'; +import { z } from 'zod'; +import { createLogger } from '../util/logger'; +import { createKeyringPair } from '../polkadot/account'; +import { type BaseUrlsConfig, getBaseUrls } from '../util/base-urls'; + +const authResponseSchema = z.object({ + accessToken: z.string(), +}); + +export type AuthResponse = z.infer; const logger = createLogger('Auth'); -export const createToken = async (account: KeyringPair): Promise => { +let token: { token: string; expiresAt: number } | null = null; + +export const getToken = async (): Promise => { + try { + const account: KeyringPair = createKeyringPair(); + + if (token == null || token.expiresAt - 60 < getCurrentTimestampInSeconds()) { + token = await createToken(account); + } + + if (token == null) { + return null; + } + + return token.token; + } catch (e) { + logger.error('failed to get token, fallbacking to null'); + logger.error(e); + + return null; + } +}; + +const decodeToken = (token: string): { expiresAt: number } | null => { + const decoded = jwt.decode(token, { + json: true, + complete: true, + }); + + if (decoded == null) { + logger.error('unable to decode token, it is malformed or corrupted'); + + return null; + } + + if (typeof decoded.payload === 'string') { + logger.error( + { + payload: decoded.payload, + }, + 'token payload is not an object', + ); + + return null; + } + + if (decoded.payload.exp == null) { + logger.error( + { + payload: decoded.payload, + }, + 'token expiration is not provided', + ); + + return null; + } + + logger.info( + { + issuer: decoded.payload.iss, + subject: decoded.payload.sub, + audience: decoded.payload.aud, + expiration: decoded.payload.exp, + issuedAt: decoded.payload.iat, + }, + 'decoded token', + ); + + return { + expiresAt: decoded.payload.exp, + }; +}; + +export const createToken = async ( + account: KeyringPair, +): Promise<{ token: string; expiresAt: number } | null> => { const signedPayload = prepareSignInPayload( { accountType: AccountType.WORKER, @@ -20,17 +105,80 @@ export const createToken = async (account: KeyringPair): Promise signature: signedPayload.signature, }; + const accessToken = await obtainTokenFromAuthServer(payload, account); + + if (accessToken === null) { + logger.warn( + { + address: account.address, + }, + 'unable to sign-in', + ); + + return null; + } + + const decodedToken = decodeToken(accessToken); + + if (decodedToken == null) { + logger.warn( + { + address: account.address, + }, + 'unable to decode token', + ); + + return null; + } + logger.info( { address: account.address, }, - `preparing signed payload`, + `retrieved token`, + ); + + return { + token: accessToken, + expiresAt: decodedToken?.expiresAt, + }; +}; + +const getCurrentTimestampInSeconds = (): number => { + return Math.floor(Date.now() / 1000); +}; + +const obtainTokenFromAuthServer = async ( + payload: SignInDto, + account: KeyringPair, +): Promise => { + const baseUrls: BaseUrlsConfig = await getBaseUrls(); + + const authUrl: string = baseUrls.authServerUrl.includes('api/auth/login') + ? baseUrls.authServerUrl + : `${baseUrls.authServerUrl}/api/auth/login`; + + logger.info( + { + authServerUrl: authUrl, + }, + 'attempting to sign-in', ); - const result = await axios - .post(MAIN_CONFIG.PALLET_AUTH_SERVER_LOGIN_URL, { + const result: AuthResponse | null = await axios + .post(authUrl, { ...payload, }) + .then((r) => { + try { + return authResponseSchema.parse(r.data); + } catch (e) { + logger.error('failed to parse response'); + logger.error(e); + + return null; + } + }) .catch((e) => { logger.error( { @@ -45,23 +193,9 @@ export const createToken = async (account: KeyringPair): Promise return null; }); - if (result === null) { - logger.warn( - { - address: account.address, - }, - 'unable to sign-in', - ); - + if (result == null) { return null; } - logger.info( - { - address: account.address, - }, - `retrieved token`, - ); - - return result.data.accessToken; + return result.accessToken; }; diff --git a/src/registry.ts b/src/auth/registry.ts similarity index 86% rename from src/registry.ts rename to src/auth/registry.ts index 4a72660..5f03d9b 100644 --- a/src/registry.ts +++ b/src/auth/registry.ts @@ -1,12 +1,13 @@ import type { KeyringPair } from '@polkadot/keyring/types'; import axios from 'axios'; -import { MAIN_CONFIG } from './config'; -import { createLogger, createReadPalletApi } from './util'; import promiseRetry from 'promise-retry'; import { u8aToHex } from '@polkadot/util'; -import { createToken } from './auth/login'; import { type ApiPromise } from '@polkadot/api'; -import { AppVersion } from './version'; +import { createReadPalletApi } from '../util/pallet-api'; +import { createLogger } from '../util/logger'; +import { getToken } from './login'; +import { getBaseUrls } from '../util/base-urls'; +import { AppVersion } from '../util/version'; const logger = createLogger('WorkersRegistry'); @@ -27,7 +28,7 @@ export const registerWorker = async (account: KeyringPair): Promise => { try { logger.info('attempting to retrieve token'); - const token: string | null = await createToken(account); + const token: string | null = await getToken(); if (token == null) { return retry(new Error('failed to sign in')); @@ -58,6 +59,8 @@ export const registerWorker = async (account: KeyringPair): Promise => { return retry(new Error('unable to register worker in registry')); } } catch (e) { + logger.error(e); + return retry(e); } finally { await api.disconnect(); @@ -80,9 +83,11 @@ const storeWorkerInRegistry = async (token: string, account: KeyringPair): Promi ), ); + const { workersRegistryUrl } = await getBaseUrls(); + await axios .post( - MAIN_CONFIG.WORKER_REGISTRY_URL + `/api/v1/workers`, + workersRegistryUrl + `/api/v1/workers`, { source, signature, @@ -108,7 +113,9 @@ const getWorkerRegistrationStatus = async ( workerAddress: string, api: ApiPromise, ): Promise => { - const path = MAIN_CONFIG.WORKER_REGISTRY_URL + `/api/v1/workers/${workerAddress}`; + const { workersRegistryUrl } = await getBaseUrls(); + + const path = workersRegistryUrl + `/api/v1/workers/${workerAddress}`; return await axios .get(path, { @@ -142,7 +149,7 @@ const getWorkerRegistrationStatus = async ( } logger.error(e.message); - logger.error(e.response.data); + logger.error(e.data); return WorkerRegistrationStatus.ERROR; }); diff --git a/src/checks.ts b/src/checks.ts index b579dba..54c301d 100644 --- a/src/checks.ts +++ b/src/checks.ts @@ -1,14 +1,14 @@ import { type ApiPromise } from '@polkadot/api'; import { type KeyringPair } from '@polkadot/keyring/types'; import { MAIN_CONFIG } from './config'; -import { sleep } from './util'; import { getOperatorSubscriptions, isConnectedAsWorker, retryHttpAsyncCall, } from './polkadot/polka'; -import axios from 'axios'; -import z from 'zod'; +import { sleep } from './util/sleep'; +import { getBaseUrls } from './util/base-urls'; +import { saveOperatorAddress } from './util/operator-address-cache'; export const runChecks = async (api: ApiPromise, account: KeyringPair, logger): Promise => { const shouldRetryInfinite: boolean = MAIN_CONFIG.RETRY_WORKER_CHECKS; @@ -55,6 +55,9 @@ export const performInitialChecks = async ( logger.info({ operatorAddress }, 'operator address'); + // Save operator address to local cache (with TTL so operator changes are reflected) + await saveOperatorAddress(account.address, operatorAddress); + const operatorSubscriptions: string[] = await retryHttpAsyncCall( async () => await getOperatorSubscriptions(api, operatorAddress), ); @@ -67,7 +70,7 @@ export const performInitialChecks = async ( logger.info({ operatorSubscriptions }, 'operator subscriptions'); - const baseUrlConfigsValid: boolean = await validateBaseUrls(MAIN_CONFIG.BASE_URLS, logger); + const baseUrlConfigsValid: boolean = await validateBaseUrls(); if (!baseUrlConfigsValid) { logger.error( @@ -81,35 +84,9 @@ export const performInitialChecks = async ( return true; }; -const validateBaseUrls = async (baseUrl: string, logger): Promise => { - const BaseUrlsConfig = z.object({ - kafka_url: z.union([z.string(), z.array(z.string())]).optional(), - kafka_proxy_url: z.string().optional(), - indexer_url: z.string().optional(), - rpc_url: z.string().optional(), - workers_registry_url: z.string().optional(), - workers_nominator_url: z.string().optional(), - cas_normalizer_url: z.string().optional(), - }); - - const receivedConfig = await axios - .get(baseUrl) - .then((x) => x.data) - .catch((e) => { - logger.error('failed to fetch base url'); - logger.error(e); - - return false; - }); - - try { - BaseUrlsConfig.parse(receivedConfig); - - return true; - } catch (e) { - logger.error('failed to parse base urls config'); - logger.error(e); +const validateBaseUrls = async (): Promise => { + // getBaseUrls will throw exception + await getBaseUrls(); - return false; - } + return true; }; diff --git a/src/config.ts b/src/config.ts index 1908770..2fa7a4d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -63,7 +63,13 @@ export const ENV_SCHEMA = z.object({ .boolean() .default(true) .describe(`If it's enabled it will enable health check routes.`), - PALLET_RPC_URL: z.string().url().describe('Read EWX Parachain URL'), + PALLET_RPC_URL: z + .string() + .url() + .describe( + 'Read EWX Parachain URL. If provided, it will overwrite value provided from BASE_URLS.', + ) + .optional(), VOTING_RPC_URL: z.string().url().describe('Write EWX Parachain URL'), VOTING_WORKER_SEED: z.string().describe('Seed of the worker (not operator)'), PORT: z.coerce.number().positive().default(8000).describe('Port number of NodeRed Server.'), @@ -101,6 +107,17 @@ export const ENV_SCHEMA = z.object({ .describe( `Should pretty print logs. If you plan to use Grafana or any other log tooling it's recommended to set it to false.`, ), + LOG_FILE_PATH: z + .string() + .optional() + .describe( + 'Full path to log file (e.g., /var/log/app.log or ./logs/app.log). If not provided, file logging is disabled.', + ), + LOG_RETENTION_DAYS: z.coerce + .number() + .positive() + .optional() + .describe('Number of days to keep rotated logs'), HEARTBEAT_PATH: z .string() .default('heartbeat_monitor.txt') @@ -118,19 +135,41 @@ export const ENV_SCHEMA = z.object({ PALLET_AUTH_SERVER_LOGIN_URL: z .string() .url() - .default('https://auth.energywebx.com/api/auth/login') - .describe('Pallet Auth Server Url used for authentication to Workers Registry'), + .describe( + 'Pallet Auth Server Url used for authentication to Workers Registry. If provided, it will overwrite value provided from BASE_URLS.', + ) + .optional(), PALLET_AUTH_SERVER_DOMAIN: z.string().default('default').describe('Pallet Auth Server domain'), WORKER_REGISTRY_URL: z .string() - .url('Url of Workers Registry that stores information about Worker Location') - .default('https://workers-registry.energywebx.com'), + .url() + .describe( + 'Url of Workers Registry that stores information about Worker Location. If provided, it will overwrite value provided from BASE_URLS.', + ) + .optional(), BASE_URLS: z .string() .url() .default('https://marketplace-cdn.energyweb.org/base_urls.json') .describe('Base URLs of EWX resources'), BUILD_METADATA_PATH: z.string().default('./build.json').describe('Path to build metadata file'), + SHUTDOWN_TIMEOUT_MS: z.coerce + .number() + .positive() + .default(30000) + .describe('Timeout in milliseconds for graceful shutdown (default: 30000)'), + ADMIN_SERVER_PORT: z.coerce + .number() + .positive() + .default(3003) + .describe('Port number for admin server (default: 3003)'), + ADMIN_API_KEY: z + .string() + .min(32) + .optional() + .describe( + 'API key for admin endpoints authentication. Must be at least 32 characters. If not set, admin endpoints will be accessible without authentication.', + ), }); export const MAIN_CONFIG: z.infer = (process.env.__SKIP_PARSE_CONFIG === 'true' diff --git a/src/errors.ts b/src/errors.ts new file mode 100644 index 0000000..676090a --- /dev/null +++ b/src/errors.ts @@ -0,0 +1,59 @@ +export class BaseUrlsInvalidFormatError extends Error { + public constructor() { + super(); + } +} + +export class FailedToFetchBaseUrlsError extends Error { + public constructor() { + super(); + } +} + +export class MissingVotingWorkerSeedError extends Error { + public constructor() { + super('Missing VOTING_WORKER_SEED'); + } +} + +export class NodeRedRuntimeStartFailedError extends Error { + public constructor() { + super('Failed to start runtime'); + } +} + +export class InvertObjectEmptyObjectError extends Error { + public constructor() { + super('Object to invert is empty'); + } +} + +export class NodeRedNodePropertyNotFound extends Error { + public constructor(nodeId: string, key: string) { + super(`Node with id ${nodeId} not found on key ${key}`); + } +} + +export class BuildMetadataPathNotFoundError extends Error { + public constructor() { + super('BUILD_METADATA_PATH does not exist'); + } +} + +export class UnableToDecodeSolutionGroup extends Error { + public constructor() { + super(`Unable to decode solution group`); + } +} + +export class UnableToObtainStakeError extends Error { + public constructor() { + super(`Unable to obtain stake`); + } +} + +export class LocalSolutionsFileNotFoundError extends Error { + public constructor() { + super(`Local solutions file not found`); + } +} diff --git a/src/health.ts b/src/health/health.ts similarity index 53% rename from src/health.ts rename to src/health/health.ts index 12a174b..203c8ac 100644 --- a/src/health.ts +++ b/src/health/health.ts @@ -1,8 +1,7 @@ -import { createLogger } from './util'; -import { getAllInstalledSolutionsNames, runtimeStarted } from './node-red/red'; -import express from 'express'; -import asyncHandler from 'express-async-handler'; -import { MAIN_CONFIG } from './config'; +import { getAllInstalledSolutionsNames, runtimeStarted } from '../node-red/red'; +import { createKeyringPair } from '../polkadot/account'; +import { MAIN_CONFIG } from '../config'; +import { getOperatorInfo, type OperatorInfo } from '../util/operator-info'; enum HealthStatus { OK = 'OK', @@ -14,37 +13,6 @@ enum ComponentName { READY = 'READY', } -export const createHealthRouter = (): express.Router | null => { - if (!MAIN_CONFIG.ENABLE_HEALTH_API) { - return null; - } - - const healthLogger = createLogger('Health'); - - const healthRouter: express.Router = express.Router({ mergeParams: true }); - - healthRouter.get('/health/liveness', (req, res) => { - const health = isLive(); - - healthLogger.debug('requested liveness'); - - res.json(health); - }); - - healthRouter.get( - '/health/readiness', - asyncHandler(async (req, res) => { - const result = await isReady(); - - healthLogger.debug(result, 'requested readiness'); - - res.json(result); - }), - ); - - return healthRouter; -}; - interface ComponentHealthStatus { status: HealthStatus; name: ComponentName | string; @@ -58,14 +26,21 @@ interface NodeRedHealthStatus extends ComponentHealthStatus { }; } -const isLive = (): ComponentHealthStatus => { +interface SolutionGroupsDetailsStatus { + timestamp: string; + rpcUrl?: string; + workerAddress?: string; + operator?: OperatorInfo | null; +} + +export const isLive = (): ComponentHealthStatus => { return { status: HealthStatus.OK, name: 'LIVE', }; }; -const isReady = async (): Promise => { +export const isReady = async (): Promise => { return [await getNodeRedHealth()]; }; @@ -94,3 +69,26 @@ export const getNodeRedHealth = async (): Promise => { }, }; }; + +export const getSolutionGroupsDetailsStatus = async (): Promise => { + const timestamp = new Date().toISOString(); + const rpcUrl = MAIN_CONFIG.PALLET_RPC_URL; + + let account: ReturnType; + try { + account = createKeyringPair(); + } catch { + // No worker identity (e.g. config not ready); return config only + return { timestamp, rpcUrl }; + } + + const workerAddress = account.address; + const operatorInfo = await getOperatorInfo(); + + return { + timestamp, + rpcUrl, + workerAddress, + operator: operatorInfo, + }; +}; diff --git a/src/heartbeat.ts b/src/health/heartbeat.ts similarity index 83% rename from src/heartbeat.ts rename to src/health/heartbeat.ts index 41d8632..6634d6f 100644 --- a/src/heartbeat.ts +++ b/src/health/heartbeat.ts @@ -1,7 +1,7 @@ import { writeFileSync } from 'fs'; -import { MAIN_CONFIG } from './config'; -import { createLogger } from './util'; -import { runtimeStarted } from './node-red/red'; +import { runtimeStarted } from '../node-red/red'; +import { createLogger } from '../util/logger'; +import { MAIN_CONFIG } from '../config'; const logger = createLogger('Heartbeat'); diff --git a/src/main.ts b/src/main.ts index 4f0a713..e7c551b 100644 --- a/src/main.ts +++ b/src/main.ts @@ -3,22 +3,40 @@ import { cryptoWaitReady } from '@polkadot/util-crypto'; import { deleteAll, startRedServer } from './node-red/red'; import express from 'express'; import bodyParser from 'body-parser'; -import { createVoteRouter } from './vote'; import { pushToQueue } from './solution'; -import { createHealthRouter } from './health'; import { runChecks } from './checks'; -import { createKeyringPair } from './account'; -import { APP_BOOTSTRAP_STATUS, createStatusRouter, setAppState } from './status'; -import { createLogger, createReadPalletApi } from './util'; -import { createConfigRouter } from './worker-config'; import { retryHttpAsyncCall } from './polkadot/polka'; -import { startHeartbeat } from './heartbeat'; -import { registerWorker } from './registry'; +import { createLogger } from './util/logger'; +import { createReadPalletApi } from './util/pallet-api'; +import { APP_BOOTSTRAP_STATUS, setAppState } from './util/status'; +import { createKeyringPair } from './polkadot/account'; +import { registerWorker } from './auth/registry'; +import { startHeartbeat } from './health/heartbeat'; +import { createHealthRouter } from './routes/health'; +import { createVoteRouter } from './routes/vote'; +import { createStatusRouter } from './routes/status'; +import { createConfigRouter } from './routes/worker-config'; +import { createTokenRouter } from './routes/token'; +import { createAdminRouter } from './routes/admin'; +import { setMainServer, setNodeRedServer, setAdminServer } from './shutdown'; +import { MAIN_CONFIG } from './config'; + +const logger = createLogger('WorkerNode'); + +const registerProcessHandlers = (): void => { + process.on('unhandledRejection', (reason) => { + logger.error({ reason }, 'unhandled promise rejection'); + }); -void (async () => { - setAppState(APP_BOOTSTRAP_STATUS.STARTED); + process.on('uncaughtException', (error) => { + logger.error({ err: error }, 'uncaught exception'); + }); +}; - const logger = createLogger('WorkerNode'); +const bootstrap = async (): Promise => { + registerProcessHandlers(); + + setAppState(APP_BOOTSTRAP_STATUS.STARTED); const app = express(); @@ -28,6 +46,7 @@ void (async () => { const healthRouter: express.Router | null = createHealthRouter(); app.use(createVoteRouter()); + app.use(createTokenRouter()); app.use(createStatusRouter()); app.use(createConfigRouter()); @@ -37,9 +56,21 @@ void (async () => { app.use(healthRouter); } - app.listen(3002, () => { + const mainServer = app.listen(3002, () => { logger.info(`vote API exposed on port 3002`); }); + setMainServer(mainServer); + + // Start admin server separately + const adminApp = express(); + adminApp.use(bodyParser.json()); + adminApp.use(bodyParser.urlencoded({ extended: false })); + adminApp.use(createAdminRouter()); + + const adminServer = adminApp.listen(MAIN_CONFIG.ADMIN_SERVER_PORT, () => { + logger.info(`admin API exposed on port ${MAIN_CONFIG.ADMIN_SERVER_PORT}`); + }); + setAdminServer(adminServer); setAppState(APP_BOOTSTRAP_STATUS.EXPOSED_HTTP); @@ -66,7 +97,8 @@ void (async () => { await registerWorker(account); - await startRedServer(app); + const nodeRedServer = await startRedServer(app); + setNodeRedServer(nodeRedServer); setAppState(APP_BOOTSTRAP_STATUS.STARTED_RED_SERVER); @@ -80,4 +112,8 @@ void (async () => { logger.info('starting heartbeat'); startHeartbeat(); -})(); +}; + +void bootstrap().catch((error) => { + logger.error({ err: error }, 'bootstrap failed'); +}); diff --git a/src/node-red/node-red-cache.ts b/src/node-red/node-red-cache.ts index 8c7a6b4..8146389 100644 --- a/src/node-red/node-red-cache.ts +++ b/src/node-red/node-red-cache.ts @@ -1,4 +1,4 @@ -import { invertObject } from '../util'; +import { invertObject } from '../util/invert-object'; type NodeRedZId = string; type SolutionNamespace = string; diff --git a/src/node-red/red.ts b/src/node-red/red.ts index 274215b..a7abe69 100644 --- a/src/node-red/red.ts +++ b/src/node-red/red.ts @@ -3,8 +3,6 @@ import { MAIN_CONFIG } from '../config'; import * as RED from 'node-red'; import * as http from 'http'; import { type Flows } from '@node-red/runtime'; -import { type ParsedFlow, type RedNode, type RedNodes } from '../types'; -import { createLogger, sleep } from '../util'; import path from 'path'; import { mkdirSync, rmSync } from 'fs'; import { @@ -14,6 +12,11 @@ import { } from './node-red-cache'; import { getSmartFlow } from '../solution-source/solution-source'; import { type SolutionGroupId, type SolutionId, type WorkerAddress } from '../polkadot/polka'; +import { createLogger } from '../util/logger'; +import { sleep } from '../util/sleep'; +import { type ParsedFlow, type RedNode, type RedNodes } from './types'; +import { type BaseUrlsConfig, getBaseUrls } from '../util/base-urls'; +import { NodeRedNodePropertyNotFound, NodeRedRuntimeStartFailedError } from '../errors'; type EWX_ENVS = | 'EWX_SOLUTION' @@ -27,7 +30,7 @@ type EWX_ENVS = const redLogger = createLogger('NodeRed'); -export const startRedServer = async (app: express.Express): Promise => { +export const startRedServer = async (app: express.Express): Promise => { const loggerConfig = { console: { level: 'off', @@ -133,6 +136,8 @@ export const startRedServer = async (app: express.Express): Promise => { redLogger.info(`To access UI panel visit http://localhost:${port}/red`); }); + + return server; }; export const runtimeStarted = async (maxAttempts: number = 10): Promise => { @@ -149,7 +154,7 @@ export const runtimeStarted = async (maxAttempts: number = 10): Promise `exceeded max attempts of runtime start`, ); - throw new Error('Unable to check if runtime started'); + throw new NodeRedRuntimeStartFailedError(); } // Hacky way of testing if runtime is started - if it returns null, it means runtime has not started @@ -236,6 +241,8 @@ export const upsertSolution = async ( configs?: any[]; } = JSON.parse(content); + const baseUrls: BaseUrlsConfig = await getBaseUrls(); + const parsedFlow: ParsedFlow = modifyFlowIds( parsedContent, solutionGroupId, @@ -244,6 +251,7 @@ export const upsertSolution = async ( worklogicId, sqliteFilePath, workerAddress, + baseUrls.rpcUrl, ); if (parsedFlow === null) { @@ -284,7 +292,7 @@ export const getNodeEnv = ( const envMeta = node.env.find((x) => x.name === key); if (envMeta == null && throwOnError) { - throw new Error(`${key} not found in ${node.id}`); + throw new NodeRedNodePropertyNotFound(node.id, key); } else if (envMeta == null && !throwOnError) { return undefined; } @@ -375,21 +383,44 @@ export const deleteNodesBySolutionGroupId = async (solutionGroupIds: string[]): export const getAllInstalledSolutionsNames = async (): Promise => { const tabNodes = await getTabNodes(); - const solutionIds: Array = await Promise.all( - tabNodes.map(async (tabNode: RedNode) => { - const solutionId = getNodeEnv(tabNode, 'EWX_SOLUTION_ID', false); + const solutionIds: Array = tabNodes.map((tabNode: RedNode) => { + const solutionId = getNodeEnv(tabNode, 'EWX_SOLUTION_ID', false); - if (solutionId == null) { - return null; - } + if (solutionId == null) { + return null; + } - return solutionId; - }), - ); + return solutionId; + }); return solutionIds.filter((x) => x !== null); }; +export interface InstalledSolutionDetails { + solutionId: string; + solutionGroupId: string; +} + +export const getAllInstalledSolutionsWithGroups = async (): Promise => { + const tabNodes = await getTabNodes(); + + const solutions: Array = tabNodes.map((tabNode: RedNode) => { + const solutionId = getNodeEnv(tabNode, 'EWX_SOLUTION_ID', false); + const solutionGroupId = getNodeEnv(tabNode, 'EWX_SOLUTION_GROUP_ID', false); + + if (solutionId == null || solutionGroupId == null) { + return null; + } + + return { + solutionId, + solutionGroupId, + }; + }); + + return solutions.filter((x): x is InstalledSolutionDetails => x !== null); +}; + export const getTabNodes = async (): Promise => { const currentFlows: Flows = await getAllFlows(); @@ -445,6 +476,7 @@ export const modifyFlowIds = ( workLogic: string, sqlitePath: string, workerAddress: string, + rpcUrl: string, ): ParsedFlow | null => { if (parsedFlow?.nodes == null) { return null; @@ -498,7 +530,7 @@ export const modifyFlowIds = ( { type: 'str', name: 'EWX_RPC_URL', - value: MAIN_CONFIG.PALLET_RPC_URL, + value: rpcUrl, }, ]; diff --git a/src/types.ts b/src/node-red/types.ts similarity index 100% rename from src/types.ts rename to src/node-red/types.ts diff --git a/src/account.ts b/src/polkadot/account.ts similarity index 59% rename from src/account.ts rename to src/polkadot/account.ts index bb7145b..7277d1f 100644 --- a/src/account.ts +++ b/src/polkadot/account.ts @@ -1,12 +1,19 @@ -import { MAIN_CONFIG } from './config'; import { type KeyringPair } from '@polkadot/keyring/types'; import { Keyring } from '@polkadot/api'; +import { MAIN_CONFIG } from '../config'; +import { MissingVotingWorkerSeedError } from '../errors'; + +let keyringPair: KeyringPair | null = null; export const createKeyringPair = (): KeyringPair => { + if (keyringPair != null) { + return keyringPair; + } + const property: string = MAIN_CONFIG.VOTING_WORKER_SEED; if (property === '' || property == null) { - throw new Error('Missing VOTING_WORKER_SEED'); + throw new MissingVotingWorkerSeedError(); } const keyring = new Keyring({ type: 'sr25519', ss58Format: 42 }); @@ -15,5 +22,7 @@ export const createKeyringPair = (): KeyringPair => { return keyring.addFromUri(property.replace('_', '')); } - return keyring.addFromMnemonic(property); + keyringPair = keyring.addFromMnemonic(property); + + return keyringPair; }; diff --git a/src/ewx-tx-manager-http.ts b/src/polkadot/ewx-tx-manager-http.ts similarity index 100% rename from src/ewx-tx-manager-http.ts rename to src/polkadot/ewx-tx-manager-http.ts diff --git a/src/ewx-tx-manager.ts b/src/polkadot/ewx-tx-manager.ts similarity index 100% rename from src/ewx-tx-manager.ts rename to src/polkadot/ewx-tx-manager.ts diff --git a/src/polkadot/polka.ts b/src/polkadot/polka.ts index 1579090..ee3d9cb 100644 --- a/src/polkadot/polka.ts +++ b/src/polkadot/polka.ts @@ -6,8 +6,9 @@ import { blake2AsHex, cryptoWaitReady } from '@polkadot/util-crypto'; import pino from 'pino'; import promiseRetry from 'promise-retry'; import { type WrapOptions } from 'retry'; -import type { EwxTxManager } from '../ewx-tx-manager'; import { type Solution, type SolutionGroup } from './polka-types'; +import { type EwxTxManager } from './ewx-tx-manager'; +import { UnableToDecodeSolutionGroup, UnableToObtainStakeError } from '../errors'; export type WorkerAddress = string; export type OperatorAddress = string; @@ -94,7 +95,7 @@ export const getSolutionGroupsByIds = async ( curr.toPrimitive() as unknown as SolutionGroup; if (primitive == null) { - throw new Error('Unable to decode codec for Solution Group'); + throw new UnableToDecodeSolutionGroup(); } acc[primitive.namespace] = primitive; @@ -103,21 +104,40 @@ export const getSolutionGroupsByIds = async ( }, {}); }; -export const getSolutions = async (api: ApiPromise): Promise => { +export const getSolutions = async ( + api: ApiPromise, + operatorSubscriptions: string[], +): Promise => { const solutions = await api.query.workerNodePallet.solutions.entries(); + const solutionsWithGroups: Record = + await api.query.workerNodePallet.groupOfSolution.entries().then((x) => { + return x + .map(([solutionNamespace, groupOfSolution]) => { + return { + solutionNamespace: (solutionNamespace.toHuman() as unknown as SolutionId)[0], + groupOfSolution: groupOfSolution.toHuman() as unknown as SolutionGroupId, + }; + }) + .reduce((acc, curr) => { + acc[curr.solutionNamespace] = curr.groupOfSolution; + + return acc; + }, {}); + }); + const results: SolutionArray = await Promise.all( solutions.map(async ([namespaceHash, solution]) => { const solutionId: string = (namespaceHash.toHuman() as unknown as SolutionId[])[0]; const solutionPrimitive = solution.toPrimitive() as unknown as Solution; - const groupOfSolution = await api.query.workerNodePallet.groupOfSolution(solutionId); - - const solutionGroupId: string | null = - groupOfSolution.toPrimitive() as unknown as SolutionGroupId; - - return [solutionId, solutionGroupId, solutionPrimitive, solutionPrimitive.status]; + return [ + solutionId, + solutionsWithGroups[solutionId] ?? null, + solutionPrimitive, + solutionPrimitive.status, + ]; }), ); @@ -208,7 +228,7 @@ export const queryStake = async ( ); if (currentStake == null || period == null || nextStake == null) { - throw new Error('unable to obtain stake'); + throw new UnableToObtainStakeError(); } return { diff --git a/src/polkadot/vote.ts b/src/polkadot/vote.ts new file mode 100644 index 0000000..65940dc --- /dev/null +++ b/src/polkadot/vote.ts @@ -0,0 +1,147 @@ +import { Keyring } from '@polkadot/api'; +import { type KeyringPair } from '@polkadot/keyring/types'; +import type { queueAsPromised } from 'fastq'; +import * as fastq from 'fastq'; +import { createLogger } from '../util/logger'; +import { createEwxTxManager } from '../util/ewx-tx-manager'; +import { sleep } from '../util/sleep'; +import { MAIN_CONFIG } from '../config'; +import { retryHttpAsyncCall, submitSolutionResult } from './polka'; +import { createCache } from 'cache-manager'; + +const ONE_DAY_MS = 24 * 60 * 60 * 1000; + +export const VOTE_STORAGE = createCache({ + ttl: ONE_DAY_MS, +}); + +interface VoteTask { + votingRoundId: string; + noderedId: string; + vote: string; + startedAt: number; + solutionNamespace: string; + voteIdentifier: string | null; + hashVote: boolean; +} + +export const voteQueue: queueAsPromised = fastq.promise(asyncWorker, 4); + +const ewxTxManager = createEwxTxManager(); + +const DELAY_TIMER: number = 30 * 1000; +const NINE_MINUTES = 540000; + +export const voteQueueLogger = createLogger('VoteQueue'); + +voteQueue.error((error: Error | null, task: VoteTask) => { + if (error == null) { + return; + } + + voteQueueLogger.error({ task }, 'unexpected vote queue error'); + voteQueueLogger.error(error); +}); + +async function asyncWorker(arg: VoteTask): Promise { + const tempLogger = createLogger({ + name: `Solution-Vote-${arg.solutionNamespace}`, + base: arg, + }); + + try { + if (Date.now() - arg.startedAt >= NINE_MINUTES) { + tempLogger.warn('timeout passed for vote, abandoning it'); + + return; + } + + await processVoteQueue(arg); + } catch (e) { + tempLogger.error(`failed to submit vote`); + tempLogger.error(e); + + if (e.toString() === 'TypeError: fetch failed') { + await sleep(DELAY_TIMER); + + tempLogger.warn('attempting to retry vote'); + await voteQueue.push(arg); + } else { + tempLogger.warn('skipping vote, non-http error'); + } + } +} + +async function processVoteQueue(task: VoteTask): Promise { + const tempLogger = createLogger({ + name: `Solution-Vote-${task.solutionNamespace}`, + base: task, + }); + + const keyring = new Keyring({ type: 'sr25519' }); + + const account: KeyringPair = keyring.addFromMnemonic(MAIN_CONFIG.VOTING_WORKER_SEED); + + tempLogger.info('attempting to send vote'); + + await retryHttpAsyncCall( + async () => + await submitSolutionResult( + ewxTxManager, + account, + task.solutionNamespace, + task.vote, + task.votingRoundId, + task.hashVote, + ), + ) + .then(async (hash) => { + if (task.voteIdentifier != null) { + await VOTE_STORAGE.set( + task.voteIdentifier, + { + createdAt: Date.now(), + transactionHash: hash, + }, + ONE_DAY_MS, + ); + } + + tempLogger.info( + { + transactionHash: hash, + }, + 'submitted vote', + ); + + tempLogger.flush(); + }) + .catch(async (e) => { + // EWX Error - (some) http errors are handled in retryHttpAsyncCall fn + tempLogger.error( + { + task, + }, + 'failed to submit solution result - voting wont be retried', + ); + tempLogger.error(e); + tempLogger.flush(); + + // We do not throw for these kind of errors so we can continue processing + }); +} + +// Drains the vote queue, waiting for all pending and running tasks to complete. +export const drainVoteQueue = async (): Promise => { + voteQueueLogger.info( + { + pendingTasks: voteQueue.length(), + runningTasks: voteQueue.running(), + }, + 'draining vote queue', + ); + + await voteQueue.drained(); + + voteQueueLogger.info('vote queue drained successfully'); +}; diff --git a/src/routes/admin.ts b/src/routes/admin.ts new file mode 100644 index 0000000..fad726f --- /dev/null +++ b/src/routes/admin.ts @@ -0,0 +1,85 @@ +import express from 'express'; +import { createLogger } from '../util/logger'; +import { getIsShuttingDown } from '../shutdown'; +import asyncHandler from 'express-async-handler'; +import { MAIN_CONFIG } from '../config'; + +const adminLogger = createLogger('Admin'); + +/** + * Middleware to authenticate admin requests using API key + */ +const authenticateAdmin = ( + req: express.Request, + res: express.Response, + next: express.NextFunction, +): void => { + // If ADMIN_API_KEY is not configured, skip authentication + if (MAIN_CONFIG.ADMIN_API_KEY == null) { + next(); + return; + } + + const apiKey = req.headers['x-api-key']; + const apiKeyString = typeof apiKey === 'string' ? apiKey : null; + + if (apiKeyString == null || apiKeyString !== MAIN_CONFIG.ADMIN_API_KEY) { + adminLogger.warn( + { + ip: req.ip, + path: req.path, + }, + 'unauthorized admin access attempt', + ); + res.status(401).json({ + success: false, + message: 'Unauthorized: Invalid or missing API key', + }); + return; + } + + next(); +}; + +export const createAdminRouter = (): express.Router => { + const adminRouter: express.Router = express.Router({ mergeParams: true }); + + // Apply authentication middleware if API key is configured + adminRouter.use(authenticateAdmin); + + adminRouter.post( + '/stop', + asyncHandler(async (req, res) => { + try { + // Check if shutdown is already in progress + if (getIsShuttingDown()) { + adminLogger.warn('shutdown already in progress, ignoring duplicate request'); + res.status(200).json({ + success: true, + message: 'Shutdown already in progress', + }); + return; + } + + adminLogger.info('received shutdown request via /admin/stop endpoint'); + + // Send response immediately before shutdown + res.status(200).json({ + success: true, + message: 'Graceful shutdown initiated', + }); + + process.kill(process.pid, 'SIGTERM'); + } catch (error) { + adminLogger.error({ error }, 'error initiating shutdown'); + res.status(500).json({ + success: false, + message: 'Failed to initiate shutdown', + error: error instanceof Error ? error.message : String(error), + }); + } + }), + ); + + return adminRouter; +}; diff --git a/src/routes/health.ts b/src/routes/health.ts new file mode 100644 index 0000000..01ee3f1 --- /dev/null +++ b/src/routes/health.ts @@ -0,0 +1,47 @@ +import { MAIN_CONFIG } from '../config'; +import express from 'express'; +import { createLogger } from '../util/logger'; +import { isLive, isReady, getSolutionGroupsDetailsStatus } from '../health/health'; +import asyncHandler from 'express-async-handler'; + +export const createHealthRouter = (): express.Router | null => { + if (!MAIN_CONFIG.ENABLE_HEALTH_API) { + return null; + } + + const healthLogger = createLogger('Health'); + + const healthRouter: express.Router = express.Router({ mergeParams: true }); + + healthRouter.get('/health/liveness', (req, res) => { + const health = isLive(); + + healthLogger.debug('requested liveness'); + + res.json(health); + }); + + healthRouter.get( + '/health/readiness', + asyncHandler(async (req, res) => { + const result = await isReady(); + + healthLogger.debug(result, 'requested readiness'); + + res.json(result); + }), + ); + + healthRouter.get( + '/health/status', + asyncHandler(async (req, res) => { + const result = await getSolutionGroupsDetailsStatus(); + + healthLogger.debug(result, 'requested solution groups details status'); + + res.json(result); + }), + ); + + return healthRouter; +}; diff --git a/src/routes/status.ts b/src/routes/status.ts new file mode 100644 index 0000000..a3d4fac --- /dev/null +++ b/src/routes/status.ts @@ -0,0 +1,14 @@ +import express from 'express'; +import { getAppState } from '../util/status'; + +export const createStatusRouter = (): express.Router => { + const statusRouter = express.Router({ mergeParams: true }); + + statusRouter.get('/status', (_, res) => { + res.status(200).json({ + status: getAppState(), + }); + }); + + return statusRouter; +}; diff --git a/src/routes/token.ts b/src/routes/token.ts new file mode 100644 index 0000000..23c3e73 --- /dev/null +++ b/src/routes/token.ts @@ -0,0 +1,20 @@ +import express from 'express'; +import asyncHandler from 'express-async-handler'; +import { getToken } from '../auth/login'; + +export const createTokenRouter = (): express.Router => { + const tokenRouter = express.Router(); + + tokenRouter.get( + '/token', + asyncHandler(async (req, res) => { + const token = await getToken().catch(() => null); + + res.status(200).json({ + token, + }); + }), + ); + + return tokenRouter; +}; diff --git a/src/routes/vote.ts b/src/routes/vote.ts new file mode 100644 index 0000000..6bbb7d1 --- /dev/null +++ b/src/routes/vote.ts @@ -0,0 +1,98 @@ +import express from 'express'; +import { z } from 'zod'; +import asyncHandler from 'express-async-handler'; +import { getSolutionNamespace } from '../node-red/red'; +import { VOTE_STORAGE, voteQueue, voteQueueLogger } from '../polkadot/vote'; + +const SUBMIT_VOTE_SCHEMA = z + .object({ + id: z.string(), + noderedId: z.string(), + root: z.string(), + hashVote: z.boolean().optional().default(true), + }) + .refine( + (data) => { + if (!data.hashVote) { + return Buffer.byteLength(data.root, 'utf8') <= 32; + } + return true; + }, + { + message: 'if not hashing the vote it must be no longer than 32 bytes', + path: ['root'], + }, + ); + +export const createVoteRouter = (): express.Router => { + const voteRouter = express.Router({ mergeParams: true }); + + voteRouter.get('/queue-info', (_, res) => { + res.json({ + pendingTasks: voteQueue.length(), + isIdle: voteQueue.idle(), + runningTasks: voteQueue.running(), + }); + }); + + voteRouter.get('/sse/:id', (req, res) => { + if (req.query?.voteIdentifier == null) { + res.status(200).json({ + hash: null, + }); + + return; + } + + void VOTE_STORAGE.get(req.query.voteIdentifier as string).then((hash) => { + res.status(200).json({ + hash, + }); + }); + }); + + voteRouter.post( + '/sse/:id', + asyncHandler(async (req, res) => { + const { hashVote, id, noderedId, root } = SUBMIT_VOTE_SCHEMA.parse(req.body); + + const solutionNamespace: string | null = await getSolutionNamespace(noderedId); + + if (solutionNamespace == null) { + voteQueueLogger.error({ solutionNamespace }, 'solution is not present in nodered'); + + res.status(204).json(); + + return; + } + + const payload = { + votingRoundId: id, + noderedId, + vote: root, + solutionNamespace, + hashVote, + }; + + try { + voteQueueLogger.info({ payload }, 'sending vote to queue'); + + res.status(204).json(); + + await voteQueue.push({ + startedAt: Date.now(), + voteIdentifier: (req.query.voteIdentifier as string) ?? null, + ...payload, + }); + } catch (e) { + voteQueueLogger.error(payload, 'failed to submit vote'); + + voteQueueLogger.error(e); + + res.status(204).json(); + } + }), + ); + + return voteRouter; +}; diff --git a/src/worker-config.ts b/src/routes/worker-config.ts similarity index 62% rename from src/worker-config.ts rename to src/routes/worker-config.ts index ed02f6d..2993c63 100644 --- a/src/worker-config.ts +++ b/src/routes/worker-config.ts @@ -1,8 +1,10 @@ import express from 'express'; -import { MAIN_CONFIG } from './config'; -import { createKeyringPair } from './account'; -import { getSolutionLogicalParts } from './node-red/red'; import asyncHandler from 'express-async-handler'; +import { createKeyringPair } from '../polkadot/account'; +import { getBaseUrls } from '../util/base-urls'; +import { getSolutionLogicalParts } from '../node-red/red'; +import { AppVersion } from '../util/version'; +import { MAIN_CONFIG } from '../config'; export const createConfigRouter = (): express.Router => { const router = express.Router({ @@ -13,6 +15,7 @@ export const createConfigRouter = (): express.Router => { '/config', asyncHandler(async (req, res) => { const account = createKeyringPair(); + const baseUrls = await getBaseUrls(); const solutionDetails: { solutionNamespace: string; @@ -23,9 +26,12 @@ export const createConfigRouter = (): express.Router => { : null; res.status(200).json({ - rpcUrl: MAIN_CONFIG.PALLET_RPC_URL, + rpcUrl: baseUrls.rpcUrl, workerAddress: account.address, + appVersion: AppVersion, solutionDetails, + baseUrlsSource: MAIN_CONFIG.BASE_URLS, + baseUrls, }); }), ); diff --git a/src/shutdown.ts b/src/shutdown.ts new file mode 100644 index 0000000..8b50316 --- /dev/null +++ b/src/shutdown.ts @@ -0,0 +1,139 @@ +import type * as http from 'http'; +import { createLogger } from './util/logger'; +import { MAIN_CONFIG } from './config'; +import * as RED from 'node-red'; +import { drainVoteQueue } from './polkadot/vote'; + +const logger = createLogger('Shutdown'); + +// Store references to resources that need cleanup +let mainServer: http.Server | null = null; +let nodeRedServer: http.Server | null = null; +let adminServer: http.Server | null = null; + +let isShuttingDown = false; + +export const getIsShuttingDown = (): boolean => { + return isShuttingDown; +}; + +// Setter functions to register resources +export const setMainServer = (server: http.Server): void => { + mainServer = server; +}; + +export const setNodeRedServer = (server: http.Server): void => { + nodeRedServer = server; +}; + +export const setAdminServer = (server: http.Server): void => { + adminServer = server; +}; + +// Main shutdown function +export const gracefulShutdown = async ( + timeoutMs: number = MAIN_CONFIG.SHUTDOWN_TIMEOUT_MS, +): Promise => { + // 1. Idempotency check + if (isShuttingDown) { + logger.warn('shutdown already in progress'); + return; + } + + isShuttingDown = true; + logger.info('initiating graceful shutdown'); + + // 2. Set exit code early to allow graceful completion + process.exitCode = 0; + + // 3. Set up timeout protection + const shutdownTimeout = setTimeout(() => { + logger.error('graceful shutdown timeout exceeded, forcing exit'); + process.exit(1); + }, timeoutMs); + + try { + // 4. Cleanup sequence + + // Drain vote queue (wait for pending votes to complete) + await drainVoteQueue(); + logger.info('vote queue drained'); + + // Stop Node-RED runtime + try { + await RED.stop(); + logger.info('Node-RED runtime stopped'); + } catch (error) { + logger.error({ error }, 'error stopping Node-RED runtime'); + } + + // Close HTTP servers (stop accepting new connections) + if (nodeRedServer != null) { + try { + await new Promise((resolve) => { + nodeRedServer?.close(() => { + logger.info('Node-RED HTTP server closed'); + resolve(); + }); + nodeRedServer?.closeIdleConnections(); + }); + } catch (error) { + logger.error({ error }, 'error closing Node-RED server'); + } + } + + if (mainServer != null) { + try { + await new Promise((resolve) => { + mainServer?.close(() => { + logger.info('main HTTP server closed'); + resolve(); + }); + mainServer?.closeIdleConnections(); + }); + } catch (error) { + logger.error({ error }, 'error closing main server'); + } + } + + // Close admin server + if (adminServer != null) { + try { + await new Promise((resolve) => { + adminServer?.close(() => { + logger.info('admin HTTP server closed'); + resolve(); + }); + adminServer?.closeIdleConnections(); + }); + } catch (error) { + logger.error({ error }, 'error closing admin server'); + } + } + + // 5. Clear timeout and complete + clearTimeout(shutdownTimeout); + logger.info('graceful shutdown completed'); + } catch (error) { + // 6. Error handling + clearTimeout(shutdownTimeout); + logger.error({ error }, 'error during graceful shutdown'); + process.exitCode = 1; + throw error; + } +}; + +// Register signal handlers for graceful shutdown +process.on('SIGTERM', () => { + logger.info('SIGTERM received, initiating graceful shutdown'); + void gracefulShutdown().then(() => { + process.exit(process.exitCode ?? 0); + }); +}); + +process.on('SIGINT', () => { + logger.info('SIGINT received, initiating graceful shutdown'); + void gracefulShutdown().then(() => { + process.exit(process.exitCode ?? 0); + }); +}); diff --git a/src/solution-source/ipfs.ts b/src/solution-source/ipfs.ts index f8ec3c8..1a2c164 100644 --- a/src/solution-source/ipfs.ts +++ b/src/solution-source/ipfs.ts @@ -1,6 +1,6 @@ import axios, { type AxiosInstance } from 'axios'; -import { createLogger } from '../util'; import { MAIN_CONFIG } from '../config'; +import { createLogger } from '../util/logger'; const ipfsLogger = createLogger('IPFS'); diff --git a/src/solution-source/local.ts b/src/solution-source/local.ts index b806807..f0191f3 100644 --- a/src/solution-source/local.ts +++ b/src/solution-source/local.ts @@ -1,6 +1,7 @@ import pino from 'pino'; import { MAIN_CONFIG } from '../config'; import { existsSync, readFileSync } from 'fs'; +import { LocalSolutionsFileNotFoundError } from '../errors'; const localLogger = pino({ name: 'LocalReaderLogger', @@ -15,7 +16,7 @@ if (MAIN_CONFIG.LOCAL_SOLUTIONS_PATH != null) { `smartflow file on this path does not exists`, ); - throw new Error('smartflow file on this path does not exists'); + throw new LocalSolutionsFileNotFoundError(); } } diff --git a/src/solution.ts b/src/solution.ts index e2fcdcb..3a7e252 100644 --- a/src/solution.ts +++ b/src/solution.ts @@ -7,9 +7,7 @@ import { getTabNodes, upsertSolution, } from './node-red/red'; -import { type RedNode, type RedNodes } from './types'; import { type Logger } from 'pino'; -import { createLogger, createReadPalletApi, sleep } from './util'; import { MAIN_CONFIG } from './config'; import { type NodeRedSolutionCache, setNodeRedSolutionCache } from './node-red/node-red-cache'; import { @@ -26,28 +24,39 @@ import { type SolutionGroupId, } from './polkadot/polka'; import { type SolutionGroup } from './polkadot/polka-types'; +import { sleep } from './util/sleep'; +import { createLogger } from './util/logger'; +import { createReadPalletApi } from './util/pallet-api'; +import { type RedNode, type RedNodes } from './node-red/types'; const logger = createLogger('SolutionLoop'); export const pushToQueue = async (account: KeyringPair): Promise => { // eslint-disable-next-line no-constant-condition - const api: ApiPromise = await retryHttpAsyncCall(async () => await createReadPalletApi()); const timeout: number = MAIN_CONFIG.SOLUTION_QUEUE_PROCESS_DELAY; // eslint-disable-next-line no-constant-condition while (true) { - await processSolutionQueue(api, account).catch(async (e) => { - logger.error('failed to complete queue loop'); - logger.error(e); + try { + const api: ApiPromise = await retryHttpAsyncCall(async () => await createReadPalletApi()); - await sleep(50000); - await pushToQueue(account); - await api.disconnect(); - }); + await processSolutionQueue(api, account).catch(async (e) => { + logger.error('failed to complete queue loop'); + logger.error(e); + + await sleep(50000); + await pushToQueue(account); + await api.disconnect(); + }); - await api.disconnect(); - await sleep(timeout); + await api.disconnect(); + await sleep(timeout); + } catch (e) { + logger.error('Critical error in pushToQueue loop, retrying in 50s'); + logger.error(e); + await sleep(50000); + } } }; @@ -100,7 +109,7 @@ async function processSolutionQueue(api: ApiPromise, workerAccount: KeyringPair) operatorSubscriptions, ); - const unfilteredSolutions: SolutionArray = await getSolutions(api); + const unfilteredSolutions: SolutionArray = await getSolutions(api, operatorSubscriptions); const solutions: SolutionArray = unfilteredSolutions.filter((solution) => operatorSubscriptions.includes(solution[1]), @@ -144,16 +153,34 @@ async function processSolutionQueue(api: ApiPromise, workerAccount: KeyringPair) return; } - for (const solution of activeTargetSolutions) { - const workLogic: string = solution[2].workload.workLogic; + const solutionGroupStatus = new Set(); - const isSuccesful: boolean = await hasValidGroupConfiguration( + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for (const [_, solutionGroup] of Object.entries(solutionGroups)) { + const hasValidConfiguration: boolean = await hasValidGroupConfiguration( api, operatorAddress, - solutionGroups[solution[1]], + solutionGroup, ); - if (!isSuccesful) { + if (!hasValidConfiguration) { + logger.warn( + { solutionGroupId: solutionGroup.namespace }, + 'operator does not meet criteria for solution group, skipping installation', + ); + + continue; + } + + solutionGroupStatus.add(solutionGroup.namespace); + } + + for (const solution of activeTargetSolutions) { + const workLogic: string = solution[2].workload.workLogic; + + const fulfillsSolutionGroupCriteria = solutionGroupStatus.has(solution[1]); + + if (!fulfillsSolutionGroupCriteria) { logger.warn( { solutionId: solution[0], diff --git a/src/util.ts b/src/util.ts deleted file mode 100644 index 1c1258b..0000000 --- a/src/util.ts +++ /dev/null @@ -1,55 +0,0 @@ -import { type ApiPromise } from '@polkadot/api'; -import pino, { type Logger, type LoggerOptions } from 'pino'; -import { MAIN_CONFIG } from './config'; -import type { EwxTxManager } from './ewx-tx-manager'; -import { EwxHttpTxManager } from './ewx-tx-manager-http'; -import { createApi, retryHttpAsyncCall } from './polkadot/polka'; - -// eslint-disable-next-line @typescript-eslint/explicit-function-return-type -export const invertObject = (obj: Record) => { - if (obj == null) { - throw new Error('obj is null'); - } - - return Object.fromEntries(Object.entries(obj).map(([key, value]) => [value, key])); -}; - -export const sleep = async (ms: number): Promise => { - await new Promise((resolve) => setTimeout(resolve, ms)); -}; - -export const createReadPalletApi = async (): Promise => { - const palletRpcUrl: string = MAIN_CONFIG.PALLET_RPC_URL; - - return await retryHttpAsyncCall(async () => await createApi(palletRpcUrl)); -}; - -export const createWritePalletApi = async (): Promise => { - const votingRpcUrl: string = MAIN_CONFIG.VOTING_RPC_URL; - - return await retryHttpAsyncCall(async () => await createApi(votingRpcUrl)); -}; - -export const createEwxTxManager = (): EwxTxManager => { - return new EwxHttpTxManager(new URL(MAIN_CONFIG.VOTING_RPC_URL)); -}; - -export const createLogger = (options: string | LoggerOptions): Logger => { - if (MAIN_CONFIG.PRETTY_PRINT) { - const prettyTransport = pino.transport({ - target: 'pino-pretty', - options: { - colorize: true, - levelFirst: true, - translateTime: 'SYS:HH:MM:ss', - }, - }); - - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - return pino(prettyTransport); - } - - return pino({ - ...(typeof options === 'string' ? { name: options } : options), - }); -}; diff --git a/src/util/base-urls.ts b/src/util/base-urls.ts new file mode 100644 index 0000000..f8430a3 --- /dev/null +++ b/src/util/base-urls.ts @@ -0,0 +1,113 @@ +import { z } from 'zod'; +import axios from 'axios'; +import { createLogger } from './logger'; +import { CACHE } from './cache'; +import { MAIN_CONFIG } from '../config'; +import { BaseUrlsInvalidFormatError, FailedToFetchBaseUrlsError } from '../errors'; + +const BASE_URLS_CACHE_KEY = 'BASE_URLS'; +const logger = createLogger('BaseUrls'); + +const BaseUrlsConfigSchema = z + .object({ + workers_nominator_url: z.string().url().optional(), + cas_normalizer_url: z.string().url().optional(), + kafka_url: z.union([z.string(), z.array(z.string())]).optional(), + kafka_proxy_url: z.string().url().optional(), + indexer_url: z.string().optional(), + base_indexer_url: z.string().url(), + rpc_url: z.string().url(), + workers_registry_url: z.string().url(), + auth_server_url: z.string().url(), + }) + .transform((data) => ({ + workersNominatorUrl: data.workers_nominator_url, + casNormalizerUrl: data.cas_normalizer_url, + kafkaUrl: data.kafka_url, + kafkaProxyUrl: data.kafka_proxy_url, + indexerUrl: data.indexer_url, + baseIndexerUrl: data.base_indexer_url, + rpcUrl: data.rpc_url, + workersRegistryUrl: data.workers_registry_url, + authServerUrl: data.auth_server_url, + })); + +export type BaseUrlsConfig = z.infer; + +export const getBaseUrls = async (): Promise => { + const cacheHit = await CACHE.get(BASE_URLS_CACHE_KEY); + + if (cacheHit != null) { + return cacheHit; + } + + logger.info( + { + baseUrls: MAIN_CONFIG.BASE_URLS, + }, + 'updating urls references from base_urls', + ); + + const receivedConfig = await axios + .get(MAIN_CONFIG.BASE_URLS) + .then((x) => x.data) + .catch((e) => { + logger.error('failed to fetch base url'); + logger.error(e); + + return false; + }); + + try { + const parsed = BaseUrlsConfigSchema.parse(receivedConfig); + + const rewritten: BaseUrlsConfig = { + ...parsed, + workersRegistryUrl: MAIN_CONFIG.WORKER_REGISTRY_URL ?? parsed.workersRegistryUrl, + authServerUrl: MAIN_CONFIG.PALLET_AUTH_SERVER_LOGIN_URL ?? parsed.authServerUrl, + rpcUrl: MAIN_CONFIG.PALLET_RPC_URL ?? parsed.rpcUrl, + }; + + logger.info( + { + baseUrls: rewritten, + }, + 'updated urls references from base_urls', + ); + + await CACHE.set(BASE_URLS_CACHE_KEY, rewritten, 300 * 1000); + + return rewritten; + } catch (e) { + if (e instanceof z.ZodError) { + logger.error( + { + issues: e.issues, + }, + 'failed to parse base urls config', + ); + + logger.error(e); + + throw new BaseUrlsInvalidFormatError(); + } + + if (axios.isAxiosError(e)) { + logger.error( + { + status: e.response?.status, + statusText: e.response?.statusText, + data: e.response?.data, + }, + 'failed to fetch base urls', + ); + + throw new FailedToFetchBaseUrlsError(); + } + + logger.error('failed to parse base urls config'); + logger.error(e); + + throw e; + } +}; diff --git a/src/util/cache.ts b/src/util/cache.ts new file mode 100644 index 0000000..eb4a788 --- /dev/null +++ b/src/util/cache.ts @@ -0,0 +1,5 @@ +import { createCache } from 'cache-manager'; + +export const CACHE = createCache({ + ttl: 600 * 1000, +}); diff --git a/src/util/ewx-tx-manager.ts b/src/util/ewx-tx-manager.ts new file mode 100644 index 0000000..39e8b5d --- /dev/null +++ b/src/util/ewx-tx-manager.ts @@ -0,0 +1,7 @@ +import { MAIN_CONFIG } from '../config'; +import { EwxHttpTxManager } from '../polkadot/ewx-tx-manager-http'; +import { type EwxTxManager } from '../polkadot/ewx-tx-manager'; + +export const createEwxTxManager = (): EwxTxManager => { + return new EwxHttpTxManager(new URL(MAIN_CONFIG.VOTING_RPC_URL)); +}; diff --git a/src/util/invert-object.ts b/src/util/invert-object.ts new file mode 100644 index 0000000..f32f3be --- /dev/null +++ b/src/util/invert-object.ts @@ -0,0 +1,10 @@ +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +import { InvertObjectEmptyObjectError } from '../errors'; + +export const invertObject = (obj: Record): any => { + if (obj == null) { + throw new InvertObjectEmptyObjectError(); + } + + return Object.fromEntries(Object.entries(obj).map(([key, value]) => [value, key])); +}; diff --git a/src/util/logger.ts b/src/util/logger.ts new file mode 100644 index 0000000..42b73d5 --- /dev/null +++ b/src/util/logger.ts @@ -0,0 +1,127 @@ +import { MAIN_CONFIG } from '../config'; +import pino, { type Logger, type LoggerOptions } from 'pino'; +import * as path from 'path'; +import * as fs from 'fs'; +import hash from 'object-hash'; + +const transportCache = new Map>(); + +const normalizeLogPath = (logPath: string): string => { + if (logPath.endsWith('/') || logPath.endsWith('\\')) { + return path.join(logPath, 'app.log'); + } + if (path.extname(logPath).length === 0) { + return `${logPath}.log`; + } + return logPath; +}; + +const ensureLogDirExists = (logPath: string): void => { + const dir = path.dirname(logPath); + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }); + } +}; + +const getFileTransport = ( + logPath: string, + retentionDays?: number, +): ReturnType => { + const normalizedPath = normalizeLogPath(logPath); + ensureLogDirExists(normalizedPath); + + const parsed = path.parse(normalizedPath); + const options = { + file: path.join(parsed.dir, parsed.name), + frequency: 'daily', + dateFormat: 'yyyy-MM-dd', + mkdir: true, + extension: parsed.ext, + stream: { + highWaterMark: 1024 * 1024 * 2, // 2MB memory buffer + autoDestroy: true, + emitClose: true, + }, + ...(retentionDays != null && retentionDays > 0 + ? { + limit: { + count: retentionDays, + removeOtherLogFiles: true, + }, + } + : {}), + }; + + const cacheKey = `file:${hash(options)}`; + const cached = transportCache.get(cacheKey); + if (cached != null) { + return cached; + } + + const transport = pino.transport({ + target: 'pino-roll', + options, + }); + transportCache.set(cacheKey, transport); + return transport; +}; + +const getPrettyTransport = (): ReturnType => { + const options = { + colorize: true, + levelFirst: true, + translateTime: 'SYS:HH:MM:ss', + }; + + const cacheKey = `pretty:${hash(options)}`; + const cached = transportCache.get(cacheKey); + if (cached != null) { + return cached; + } + + const transport = pino.transport({ + target: 'pino-pretty', + options, + }); + transportCache.set(cacheKey, transport); + return transport; +}; + +export const createLogger = (options: string | LoggerOptions): Logger => { + const loggerOptions = typeof options === 'string' ? { name: options } : options; + const level = loggerOptions.level ?? 'info'; + + const { LOG_FILE_PATH, PRETTY_PRINT, LOG_RETENTION_DAYS } = MAIN_CONFIG; + const hasFile = Boolean(LOG_FILE_PATH?.trim()); + const hasPretty = Boolean(PRETTY_PRINT); + + // No transports: simple console logger + if (!hasFile && !hasPretty) { + return pino({ ...loggerOptions, level }); + } + + // Both file and pretty + if (hasFile && hasPretty && LOG_FILE_PATH != null) { + const fileTransport = getFileTransport(LOG_FILE_PATH, LOG_RETENTION_DAYS); + const prettyTransport = getPrettyTransport(); + return pino( + { ...loggerOptions, level }, + pino.multistream([ + { level, stream: fileTransport }, + { level, stream: prettyTransport }, + ]), + ); + } + + // File only + if (hasFile && LOG_FILE_PATH != null) { + const fileTransport = getFileTransport(LOG_FILE_PATH, LOG_RETENTION_DAYS); + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + return pino({ ...loggerOptions, level }, fileTransport); + } + + // Pretty only + const prettyTransport = getPrettyTransport(); + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + return pino({ ...loggerOptions, level }, prettyTransport); +}; diff --git a/src/util/operator-address-cache.ts b/src/util/operator-address-cache.ts new file mode 100644 index 0000000..8401f3a --- /dev/null +++ b/src/util/operator-address-cache.ts @@ -0,0 +1,80 @@ +import type { ApiPromise } from '@polkadot/api'; +import { createCache } from 'cache-manager'; +import { createLogger } from './logger'; +import { getOperatorAddress, retryHttpAsyncCall } from '../polkadot/polka'; +import { createReadPalletApi } from './pallet-api'; + +const logger = createLogger('OperatorAddressCache'); + +const OPERATOR_ADDRESS_CACHE_TTL_MS = 300000; + +// In-memory cache with TTL so operator changes are reflected while WNS is running +const operatorAddressCache = createCache({ + ttl: OPERATOR_ADDRESS_CACHE_TTL_MS, +}); + +export const saveOperatorAddress = async ( + workerAddress: string, + operatorAddress: string, +): Promise => { + await operatorAddressCache.set(workerAddress, operatorAddress, OPERATOR_ADDRESS_CACHE_TTL_MS); +}; + +export const getCachedOperatorAddress = async ( + workerAddress: string, + timeoutMs: number = 5000, +): Promise => { + // Return cached value if available (undefined when missing or expired) + const cached = await operatorAddressCache.get(workerAddress); + if (cached != null && typeof cached === 'string') { + return cached; + } + + // Query chain if not cached + let api: ApiPromise | null = null; + + try { + const apiPromise = retryHttpAsyncCall(async () => await createReadPalletApi()); + const timeoutPromise = new Promise((_resolve, reject) => { + setTimeout(() => { + reject(new Error('Operator address fetch timeout')); + }, timeoutMs); + }); + + api = await Promise.race([apiPromise, timeoutPromise]); + + if (api == null) { + return null; + } + + const apiInstance = api; + const operatorAddressPromise = retryHttpAsyncCall( + async () => await getOperatorAddress(apiInstance, workerAddress), + ); + const operatorAddress = await Promise.race([operatorAddressPromise, timeoutPromise]).catch( + () => null, + ); + + if (operatorAddress != null) { + // Cache the result with TTL + await operatorAddressCache.set(workerAddress, operatorAddress, OPERATOR_ADDRESS_CACHE_TTL_MS); + return operatorAddress; + } + + logger.warn({ workerAddress }, 'no operator assigned to worker'); + return null; + } catch (error) { + logger.warn({ error, workerAddress }, 'failed to get operator address from chain'); + return null; + } finally { + if (api != null) { + await api.disconnect().catch(() => { + // Ignore disconnect errors + }); + } + } +}; + +export const clearOperatorAddress = async (workerAddress: string): Promise => { + await operatorAddressCache.del(workerAddress); +}; diff --git a/src/util/operator-info.ts b/src/util/operator-info.ts new file mode 100644 index 0000000..36d7ab6 --- /dev/null +++ b/src/util/operator-info.ts @@ -0,0 +1,138 @@ +import { getAllInstalledSolutionsWithGroups, type InstalledSolutionDetails } from '../node-red/red'; +import { createKeyringPair } from '../polkadot/account'; +import { getCachedOperatorAddress } from './operator-address-cache'; +import { createLogger } from './logger'; + +const logger = createLogger('OperatorInfo'); + +// UUID regex (RFC4122: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) +const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + +/** + * Extracts a display name from solutionId for health/operator info. + * + * Use case: EWX solution IDs often follow "name.uuid" (e.g. "newTestGPSaaS.1348d595-ccc4-4a38-85ad-f0e31cc7f410"). + * When the segment after the last dot is a valid UUID, we use the prefix as the human-readable name; + * otherwise we return the full solutionId (e.g. "a.b.c" or "simpleName"). + * + * Edge cases: if stripping would yield an empty name (e.g. ".uuid" or malformed ids), returns the + * full solutionId so the UI never shows an empty name. + */ +const extractSolutionName = (solutionId: string): string => { + if (solutionId.length === 0) return solutionId; + + const lastDot = solutionId.lastIndexOf('.'); + if (lastDot === -1) return solutionId; + + const lastSegment = solutionId.substring(lastDot + 1); + if (!UUID_REGEX.test(lastSegment)) return solutionId; + + const name = solutionId.substring(0, lastDot); + return name.length > 0 ? name : solutionId; +}; + +export enum SolutionStatus { + ACTIVE = 'Active', +} + +export interface SolutionInfo { + id: string; + name: string; + status: SolutionStatus | string; + installed: boolean; +} + +export interface SolutionGroupInfo { + id: string; + name?: string; + hasCidAllowList?: boolean; + solutions: SolutionInfo[]; +} + +export interface OperatorInfo { + address: string; + subscriptions: string[]; + solutionGroups: SolutionGroupInfo[]; +} + +export const getOperatorInfo = async (timeoutMs: number = 5000): Promise => { + try { + const account = createKeyringPair(); + + // Get installed solutions with groups from Node-RED (local, fast) + let installedSolutionsWithGroups: InstalledSolutionDetails[] = []; + try { + installedSolutionsWithGroups = await getAllInstalledSolutionsWithGroups(); + } catch (error) { + logger.error({ error }, 'failed to get installed solutions with groups'); + return null; + } + + if (installedSolutionsWithGroups.length === 0) { + // No installed solutions - still need operator address + const operatorAddress = await getCachedOperatorAddress(account.address, timeoutMs); + + if (operatorAddress == null) { + return null; + } + + return { + address: operatorAddress, + subscriptions: [], + solutionGroups: [], + }; + } + + // Extract unique solution group IDs from installed solutions (local data) + const uniqueSolutionGroupIds = [ + ...new Set(installedSolutionsWithGroups.map((s) => s.solutionGroupId)), + ]; + + // Get operator address (required) - getCachedOperatorAddress handles cache and chain query + const operatorAddress = await getCachedOperatorAddress(account.address, timeoutMs); + + if (operatorAddress == null) { + return null; + } + + // Group installed solutions by solutionGroupId (local data) + const solutionGroupsMap = new Map(); + for (const solution of installedSolutionsWithGroups) { + const group = solutionGroupsMap.get(solution.solutionGroupId) ?? []; + group.push(solution); + solutionGroupsMap.set(solution.solutionGroupId, group); + } + + // Build solution groups from local data + const solutionGroups: SolutionGroupInfo[] = Array.from(solutionGroupsMap.entries()).map( + ([solutionGroupId, solutions]) => { + const solutionInfos: SolutionInfo[] = solutions.map((solution) => { + // Extract name from solutionId, handling UUID suffix if present + const name = extractSolutionName(solution.solutionId); + + return { + id: solution.solutionId, + name, + status: SolutionStatus.ACTIVE, // Assume active if installed + installed: true, // All are installed + }; + }); + + return { + id: solutionGroupId, + name: solutionGroupId, // Use groupId as name (we don't have name from chain) + solutions: solutionInfos, + }; + }, + ); + + return { + address: operatorAddress, + subscriptions: uniqueSolutionGroupIds, + solutionGroups, + }; + } catch (error) { + logger.error({ error }, 'failed to get operator info'); + return null; + } +}; diff --git a/src/util/pallet-api.ts b/src/util/pallet-api.ts new file mode 100644 index 0000000..a073971 --- /dev/null +++ b/src/util/pallet-api.ts @@ -0,0 +1,9 @@ +import { type ApiPromise } from '@polkadot/api'; +import { createApi, retryHttpAsyncCall } from '../polkadot/polka'; +import { getBaseUrls } from './base-urls'; + +export const createReadPalletApi = async (): Promise => { + const { rpcUrl } = await getBaseUrls(); + + return await retryHttpAsyncCall(async () => await createApi(rpcUrl)); +}; diff --git a/src/util/sleep.ts b/src/util/sleep.ts new file mode 100644 index 0000000..0ce82be --- /dev/null +++ b/src/util/sleep.ts @@ -0,0 +1,3 @@ +export const sleep = async (ms: number): Promise => { + await new Promise((resolve) => setTimeout(resolve, ms)); +}; diff --git a/src/status.ts b/src/util/status.ts similarity index 65% rename from src/status.ts rename to src/util/status.ts index 7e70874..0344fc2 100644 --- a/src/status.ts +++ b/src/util/status.ts @@ -1,5 +1,4 @@ -import express from 'express'; -import { createLogger } from './util'; +import { createLogger } from './logger'; export enum APP_BOOTSTRAP_STATUS { STARTED = 'STARTED', @@ -14,6 +13,8 @@ const statusLogger = createLogger('AppBootstrapStatus'); let APP_STATE: APP_BOOTSTRAP_STATUS = APP_BOOTSTRAP_STATUS.STARTED; +export const getAppState = (): APP_BOOTSTRAP_STATUS => APP_STATE; + export const setAppState = (state: APP_BOOTSTRAP_STATUS): void => { statusLogger.info( { @@ -25,15 +26,3 @@ export const setAppState = (state: APP_BOOTSTRAP_STATUS): void => { APP_STATE = state; }; - -export const createStatusRouter = (): express.Router => { - const statusRouter = express.Router({ mergeParams: true }); - - statusRouter.get('/status', (_, res) => { - res.status(200).json({ - status: APP_STATE, - }); - }); - - return statusRouter; -}; diff --git a/src/version.ts b/src/util/version.ts similarity index 69% rename from src/version.ts rename to src/util/version.ts index aa64ad0..963b115 100644 --- a/src/version.ts +++ b/src/util/version.ts @@ -1,6 +1,7 @@ import { z } from 'zod'; -import { MAIN_CONFIG } from './config'; -import { readFileSync, existsSync } from 'fs'; +import { existsSync, readFileSync } from 'fs'; +import { MAIN_CONFIG } from '../config'; +import { BuildMetadataPathNotFoundError } from '../errors'; const VERSION_SCHEMA = z.object({ version: z.string(), @@ -11,7 +12,7 @@ export type Version = z.infer; const readVersion = (): Version => { if (!existsSync(MAIN_CONFIG.BUILD_METADATA_PATH)) { - throw new Error('BUILD_METADATA_PATH does not exist'); + throw new BuildMetadataPathNotFoundError(); } const contents = readFileSync(MAIN_CONFIG.BUILD_METADATA_PATH).toString(); diff --git a/src/vote.ts b/src/vote.ts deleted file mode 100644 index 9b9eabe..0000000 --- a/src/vote.ts +++ /dev/null @@ -1,230 +0,0 @@ -import { Keyring } from '@polkadot/api'; -import { type KeyringPair } from '@polkadot/keyring/types'; -import express from 'express'; -import asyncHandler from 'express-async-handler'; -import type { queueAsPromised } from 'fastq'; -import * as fastq from 'fastq'; -import { z } from 'zod'; -import { MAIN_CONFIG } from './config'; -import { getSolutionNamespace } from './node-red/red'; -import { retryHttpAsyncCall, submitSolutionResult } from './polkadot/polka'; -import { createEwxTxManager, createLogger, sleep } from './util'; - -interface VoteTask { - votingRoundId: string; - noderedId: string; - vote: string; - startedAt: number; - solutionNamespace: string; - voteIdentifier: string | null; - hashVote: boolean; -} - -const SUBMIT_VOTE_SCHEMA = z - .object({ - id: z.string(), - noderedId: z.string(), - root: z.string(), - hashVote: z.boolean().optional().default(true), - }) - .refine( - (data) => { - if (!data.hashVote) { - return Buffer.byteLength(data.root, 'utf8') <= 32; - } - return true; - }, - { - message: 'if not hashing the vote it must be no longer than 32 bytes', - path: ['root'], - }, - ); - -const queue: queueAsPromised = fastq.promise(asyncWorker, 4); - -const ewxTxManager = createEwxTxManager(); - -const DELAY_TIMER: number = 30 * 1000; -const NINE_MINUTES = 540000; -const ONE_DAY_MS = 24 * 60 * 60 * 1000; - -const voteQueueLogger = createLogger('VoteQueue'); - -const voteStorage: Map = new Map< - string, - { transactionHash: string | null; createdAt: number } ->(); - -queue.error((error: Error | null, task: VoteTask) => { - if (error == null) { - return; - } - - voteQueueLogger.error({ task }, 'unexpected vote queue error'); - voteQueueLogger.error(error); -}); - -setInterval(() => { - const current = Date.now(); - - voteStorage.forEach(({ createdAt }, key) => { - if (createdAt + ONE_DAY_MS <= current) { - voteStorage.delete(key); - } - }); -}, 5000); - -export const createVoteRouter = (): express.Router => { - const voteRouter = express.Router({ mergeParams: true }); - - voteRouter.get('/queue-info', (_, res) => { - res.json({ - pendingTasks: queue.length(), - isIdle: queue.idle(), - runningTasks: queue.running(), - }); - }); - - voteRouter.get('/sse/:id', (req, res) => { - if (req.query?.voteIdentifier == null) { - res.status(200).json({ - hash: null, - }); - - return; - } - - res.status(200).json({ - hash: voteStorage.get(req.query.voteIdentifier as string) ?? null, - }); - }); - - voteRouter.post( - '/sse/:id', - asyncHandler(async (req, res) => { - const { hashVote, id, noderedId, root } = SUBMIT_VOTE_SCHEMA.parse(req.body); - - const solutionNamespace: string | null = await getSolutionNamespace(noderedId); - - if (solutionNamespace == null) { - voteQueueLogger.error({ solutionNamespace }, 'solution is not present in nodered'); - - res.status(204).json(); - - return; - } - - const payload = { - votingRoundId: id, - noderedId, - vote: root, - solutionNamespace, - hashVote, - }; - - try { - voteQueueLogger.info({ payload }, 'sending vote to queue'); - - res.status(204).json(); - - await queue.push({ - startedAt: Date.now(), - voteIdentifier: (req.query.voteIdentifier as string) ?? null, - ...payload, - }); - } catch (e) { - voteQueueLogger.error(payload, 'failed to submit vote'); - - voteQueueLogger.error(e); - - res.status(204).json(); - } - }), - ); - - return voteRouter; -}; - -async function asyncWorker(arg: VoteTask): Promise { - const tempLogger = createLogger({ - name: `Solution-Vote-${arg.solutionNamespace}`, - base: arg, - }); - - try { - if (Date.now() - arg.startedAt >= NINE_MINUTES) { - tempLogger.warn('timeout passed for vote, abandoning it'); - - return; - } - - await processVoteQueue(arg); - } catch (e) { - tempLogger.error(`failed to submit vote`); - tempLogger.error(e); - - if (e.toString() === 'TypeError: fetch failed') { - await sleep(DELAY_TIMER); - - tempLogger.warn('attempting to retry vote'); - await queue.push(arg); - } else { - tempLogger.warn('skipping vote, non-http error'); - } - } -} - -async function processVoteQueue(task: VoteTask): Promise { - const tempLogger = createLogger({ - name: `Solution-Vote-${task.solutionNamespace}`, - base: task, - }); - - const keyring = new Keyring({ type: 'sr25519' }); - - const account: KeyringPair = keyring.addFromMnemonic(MAIN_CONFIG.VOTING_WORKER_SEED); - - tempLogger.info('attempting to send vote'); - - await retryHttpAsyncCall( - async () => - await submitSolutionResult( - ewxTxManager, - account, - task.solutionNamespace, - task.vote, - task.votingRoundId, - task.hashVote, - ), - ) - .then((hash) => { - if (task.voteIdentifier != null) { - voteStorage.set(task.voteIdentifier, { - createdAt: Date.now(), - transactionHash: hash, - }); - } - - tempLogger.info( - { - transactionHash: hash, - }, - 'submitted vote', - ); - - tempLogger.flush(); - }) - .catch(async (e) => { - // EWX Error - (some) http errors are handled in retryHttpAsyncCall fn - tempLogger.error( - { - task, - }, - 'failed to submit solution result - voting wont be retried', - ); - tempLogger.error(e); - tempLogger.flush(); - - // We do not throw for these kind of errors so we can continue processing - }); -}