Skip to content

Commit 280c823

Browse files
authored
refactor: extracts price feeding from hermes client (#10)
* refactor: extracts price feeding from hermes client * refactor: separate price consumeption and updates into separate streams
1 parent d61a04a commit 280c823

16 files changed

Lines changed: 785 additions & 407 deletions

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,12 @@ akash query wasm contract-state smart $CONTRACT_ADDRESS '{"get_config":{}}'
175175
| `WALLET_SECRET` | Yes | - | Either `privateKey:<private key in hex format>` or `mnemonic:<12/24 words>` |
176176
| `HERMES_ENDPOINT` | No | `https://hermes.pyth.network` | Pyth Hermes API |
177177
| `PRICE_DEVIATION_TOLERANCE` | No | 0 | absolute or percentage value for price deviations which should be ignored (e.g., `100` or `10%`) |
178-
| `UPDATE_INTERVAL_MS` | No | `300000` | Update interval (5 min) |
178+
| `UPDATE_INTERVAL_MS` | No | `5000` | Update interval (default 5 sec) |
179179
| `GAS_PRICE` | No | `0.025uakt` | Gas price |
180180
| `DENOM` | No | `uakt` | Token denomination |
181181
| `HEALTHCHECK_PORT` | No | 3000 | healthcheck server port |
182182
| `OTEL_RESOURCE_ATTRIBUTES` | No | <empty> | additional attributes attached to all metrics (e.g., `service.name=hermes,service.version=1.1.0,deployment.environment=production`) |
183+
| `SMART_CONTRACT_CONFIG_CACHE_TTL_MS` | No | 3600000 (1h) | smart contract config cache ttl in milliseconds |
183184

184185
### Instrumentation
185186

src/cli-commands/command-config.test.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,32 +64,32 @@ describe("parseConfig", () => {
6464
expect((result as Extract<typeof result, { ok: true }>).value.rpcEndpoint).toBe("https://custom-rpc:443");
6565
});
6666

67-
it("passes HERMES_ENDPOINT to config", () => {
67+
it("accepts HERMES_ENDPOINT and produces priceProducerFactory", () => {
6868
const result = parseConfig(validEnv({ HERMES_ENDPOINT: "https://hermes.example.com" }));
6969

7070
expect(result.ok).toBe(true);
71-
expect((result as Extract<typeof result, { ok: true }>).value.hermesEndpoint).toBe("https://hermes.example.com");
71+
expect((result as Extract<typeof result, { ok: true }>).value.priceProducerFactory).toBeTypeOf("function");
7272
});
7373

74-
it("uses default hermesEndpoint when HERMES_ENDPOINT is not provided", () => {
74+
it("uses default HERMES_ENDPOINT and produces priceProducerFactory", () => {
7575
const result = parseConfig(validEnv());
7676

7777
expect(result.ok).toBe(true);
78-
expect((result as Extract<typeof result, { ok: true }>).value.hermesEndpoint).toBe("https://hermes.pyth.network");
78+
expect((result as Extract<typeof result, { ok: true }>).value.priceProducerFactory).toBeTypeOf("function");
7979
});
8080

81-
it("parses UPDATE_INTERVAL_MS as integer", () => {
81+
it("accepts UPDATE_INTERVAL_MS and produces priceProducerFactory", () => {
8282
const result = parseConfig(validEnv({ UPDATE_INTERVAL_MS: "5000" }));
8383

8484
expect(result.ok).toBe(true);
85-
expect((result as Extract<typeof result, { ok: true }>).value.updateIntervalMs).toBe(5000);
85+
expect((result as Extract<typeof result, { ok: true }>).value.priceProducerFactory).toBeTypeOf("function");
8686
});
8787

88-
it("uses default updateIntervalMs when UPDATE_INTERVAL_MS is not provided", () => {
88+
it("uses default UPDATE_INTERVAL_MS and produces priceProducerFactory", () => {
8989
const result = parseConfig(validEnv());
9090

9191
expect(result.ok).toBe(true);
92-
expect((result as Extract<typeof result, { ok: true }>).value.updateIntervalMs).toBe(5 * 60 * 1000);
92+
expect((result as Extract<typeof result, { ok: true }>).value.priceProducerFactory).toBeTypeOf("function");
9393
});
9494

9595
it("returns error when UPDATE_INTERVAL_MS is not a valid integer", () => {

src/cli-commands/command-config.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import { z } from "zod";
22
import { HermesClient, type HermesConfig } from "../hermes-client.ts";
33
import { validateContractAddress, validateWalletSecret } from "../validation.ts";
4+
import type { PriceProducerFactoryOptions } from "../types.ts";
5+
import { pollPriceStream } from "../price-stream/polling-price-stream.ts";
46

57
export interface CommandConfig extends HermesConfig {
68
createHermesClient: (config: HermesConfig) => Promise<HermesClient>;
79
signal: AbortSignal;
810
healthcheckPort: number;
11+
rawConfig: z.infer<typeof configSchema>;
912
}
1013

1114
const configSchema = z.object({
@@ -38,11 +41,12 @@ const configSchema = z.object({
3841
});
3942
}
4043
}).optional(),
41-
UPDATE_INTERVAL_MS: z.coerce.number().int().min(1000).positive().default(5 * 60 * 1000), // Default to 5 minutes
44+
UPDATE_INTERVAL_MS: z.coerce.number().int().nonnegative().default(5 * 1000), // Default to 5 seconds
4245
HEALTHCHECK_PORT: z.coerce.number().int().min(1).max(65535).default(3000),
4346
GAS_PRICE: z.string().regex(/^(\d+)(\.\d+)?uakt$/, { message: 'GAS_PRICE must be a valid number with unit (e.g., "0.025uakt")' }).default("0.025uakt"),
4447
DENOM: z.string().default("uakt"),
4548
NODE_ENV: z.enum(["development", "production"]).optional(),
49+
SMART_CONTRACT_CONFIG_CACHE_TTL_MS: z.coerce.number().int().min(1000).positive().default(60 * 60 * 1000),
4650
});
4751

4852
type ParsedConfig = Omit<CommandConfig, "signal" | "logger">;
@@ -54,17 +58,26 @@ export function parseConfig(config: Record<string, string | undefined>): ParseCo
5458
return { ok: false, error: z.prettifyError(result.error) };
5559
}
5660

61+
const unsafeAllowInsecureEndpoints = result.data.NODE_ENV === "development"; // Enforce secure endpoints in production
5762
const parsedConfig: ParsedConfig = {
58-
unsafeAllowInsecureEndpoints: result.data.NODE_ENV === "development", // Enforce secure endpoints in production
63+
rawConfig: result.data,
64+
unsafeAllowInsecureEndpoints,
5965
rpcEndpoint: result.data.RPC_ENDPOINT,
60-
hermesEndpoint: result.data.HERMES_ENDPOINT,
6166
contractAddress: result.data.CONTRACT_ADDRESS,
6267
walletSecret: result.data.WALLET_SECRET,
63-
updateIntervalMs: result.data.UPDATE_INTERVAL_MS,
6468
healthcheckPort: result.data.HEALTHCHECK_PORT,
6569
gasPrice: result.data.GAS_PRICE,
6670
denom: result.data.DENOM,
6771
priceDeviationTolerance: result.data.PRICE_DEVIATION_TOLERANCE,
72+
smartContractConfigCacheTTLMs: result.data.SMART_CONTRACT_CONFIG_CACHE_TTL_MS,
73+
priceProducerFactory(options: PriceProducerFactoryOptions) {
74+
return pollPriceStream({
75+
...options,
76+
unsafeAllowInsecureEndpoints,
77+
baseUrl: result.data.HERMES_ENDPOINT,
78+
pollingIntervalMs: result.data.UPDATE_INTERVAL_MS,
79+
});
80+
},
6881
createHermesClient: (cfg: HermesConfig) => HermesClient.connect(cfg),
6982
};
7083

src/cli-commands/daemon-command.test.ts

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ describe("daemonCommand", () => {
4040
const promise = daemonCommand(config);
4141
await waitForServer(logger);
4242

43-
const reponse = await fetch(`http://localhost:${config.healthcheckPort}/health`);
43+
const port = getServerPort(logger);
44+
const reponse = await fetch(`http://localhost:${port}/health`);
4445
expect(reponse.status).toBe(200);
4546
expect(client.getStatus).toHaveBeenCalled();
4647
expect(logger.log).toHaveBeenCalledWith(
47-
`Health check endpoint available at http://localhost:${config.healthcheckPort}/health`,
48+
expect.stringMatching(/Health check endpoint available at http:\/\/localhost:\d+\/health/),
4849
);
49-
expect(logger.log).toHaveBeenCalledWith("Daemon started. Press Ctrl+C to stop.\n");
5050

5151
abortController.abort();
5252
await promise;
@@ -57,7 +57,8 @@ describe("daemonCommand", () => {
5757
const promise = daemonCommand(config);
5858
await waitForServer(logger);
5959

60-
const response = await fetch(`http://localhost:${config.healthcheckPort}/invalid`);
60+
const port = getServerPort(logger);
61+
const response = await fetch(`http://localhost:${port}/invalid`);
6162
expect(response.status).toBe(404);
6263

6364
abortController.abort();
@@ -74,7 +75,6 @@ describe("daemonCommand", () => {
7475

7576
expect(logger.log).toHaveBeenCalledWith("\n\nShutting down daemon...");
7677
expect(logger.log).toHaveBeenCalledWith("\nStopping health check server...");
77-
expect(logger.log).toHaveBeenCalledWith("Health check server stopped");
7878
});
7979

8080
it("stops server immediately if signal is already aborted on startup", async () => {
@@ -91,21 +91,31 @@ describe("daemonCommand", () => {
9191
});
9292
}
9393

94+
function getServerPort(logger: Console): number {
95+
const calls = (logger.log as ReturnType<typeof vi.fn>).mock.calls;
96+
const call = calls.find((c: unknown[]) => typeof c[0] === "string" && /localhost:\d+/.test(c[0] as string));
97+
const match = (call![0] as string).match(/localhost:(\d+)/);
98+
return parseInt(match![1], 10);
99+
}
100+
94101
let testAbortController: AbortController | null = null;
95102
function setup() {
96103
const client = mock<HermesClient>();
97-
client.getStatus.mockReturnValue({ isRunning: true, contractAddress: "", priceFeedId: "", address: "" });
104+
client.getStatus.mockResolvedValue({ isRunning: true, contractAddress: "", priceFeedId: "", address: "" });
98105
const logger = mock<Console>();
99106
const abortController = new AbortController();
100107
testAbortController = abortController;
101108
const config: CommandConfig = {
102109
rpcEndpoint: "https://rpc.akashnet.net:443",
103110
contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu",
104-
mnemonic: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
111+
walletSecret: { type: "mnemonic", value: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" },
112+
priceProducerFactory: vi.fn(async function* () {}) as unknown as CommandConfig["priceProducerFactory"],
105113
logger,
106114
signal: abortController.signal,
107-
healthcheckPort: 3001,
115+
healthcheckPort: 0,
108116
createHermesClient: vi.fn(() => Promise.resolve(client)),
117+
smartContractConfigCacheTTLMs: 0,
118+
rawConfig: {} as CommandConfig["rawConfig"],
109119
};
110120
return { config, client, logger, abortController };
111121
}

src/cli-commands/daemon-command.ts

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
11
import http from "node:http";
2-
import { once } from "node:events";
32
import type { AddressInfo } from "node:net";
43
import { prometheusExporter } from "../instrumentation/prometheus-exporter.ts";
54
import type { CommandConfig } from "./command-config.ts";
65

76
export async function daemonCommand(config: CommandConfig): Promise<void> {
7+
if (config.signal.aborted) return;
8+
89
config.logger?.log("Starting daemon mode...\n");
910

1011
const client = await config.createHermesClient(config);
1112
const server = http.createServer((req, res) => {
1213
if (req.method === "GET" && req.url === "/health") {
13-
const status = client.getStatus();
14-
res.writeHead(200, { "Content-Type": "application/json" });
15-
res.end(JSON.stringify(status));
14+
client.getStatus()
15+
.then((status) => {
16+
res.writeHead(200, { "Content-Type": "application/json" });
17+
res.end(JSON.stringify(status));
18+
})
19+
.catch((error) => {
20+
config.logger?.log(`Error fetching health status: ${error.message}`);
21+
res.writeHead(500, { "Content-Type": "application/json" });
22+
res.end();
23+
});
1624
} else if (req.method === "GET" && req.url === "/metrics") {
1725
prometheusExporter.getMetricsRequestHandler(req, res);
1826
} else {
@@ -23,33 +31,18 @@ export async function daemonCommand(config: CommandConfig): Promise<void> {
2331
const abort = () => {
2432
config.logger?.log("\n\nShutting down daemon...");
2533
config.logger?.log("\nStopping health check server...");
26-
return new Promise<void>((resolve) => {
27-
server.close((err) => {
28-
if (err) {
29-
config.logger?.log(`Error stopping health check server: ${err.message}`);
30-
}
31-
resolve();
32-
config.logger?.log("Health check server stopped");
33-
});
34-
});
3534
};
3635
config.signal.addEventListener("abort", abort, { once: true });
37-
await client.start({ signal: config.signal });
38-
await new Promise<void>((resolve, reject) => {
39-
if (config.signal.aborted) return resolve();
40-
server.once("error", reject);
41-
server.listen(config.healthcheckPort, () => {
42-
resolve();
43-
server.off("error", reject);
44-
if (!config.signal.aborted) {
36+
await Promise.all([
37+
client.start({ signal: config.signal }),
38+
new Promise<void>((resolve, reject) => {
39+
server.once("error", reject);
40+
server.listen({ port: config.healthcheckPort, signal: config.signal }, () => {
4541
config.logger?.log(`Health check endpoint available at http://localhost:${(server.address() as AddressInfo).port}/health`);
46-
}
47-
});
48-
});
49-
if (config.signal.aborted && server.listening) {
50-
await abort();
51-
} else if (server.listening) {
52-
config.logger?.log("Daemon started. Press Ctrl+C to stop.\n");
53-
await once(server, "close");
54-
}
42+
server.off("error", reject);
43+
server.once("close", resolve);
44+
});
45+
}),
46+
]);
47+
config.signal.removeEventListener("abort", abort);
5548
}

src/cli-commands/status-command.test.ts

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import EventEmitter from "node:events";
21
import { describe, it, expect, vi } from "vitest";
32
import { mock } from "vitest-mock-extended";
43
import type { HermesClient } from "../hermes-client.ts";
@@ -8,22 +7,22 @@ import { statusCommand } from "./status-command.ts";
87
function setup() {
98
const client = mock<HermesClient>();
109
const logger = mock<Console>();
11-
const config: CommandConfig = {
10+
const config = {
1211
rpcEndpoint: "https://rpc.akashnet.net:443",
1312
contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu",
14-
mnemonic: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
15-
hermesEndpoint: "https://hermes.pyth.network",
13+
rawConfig: {
14+
HERMES_ENDPOINT: "https://hermes.pyth.network",
15+
},
1616
logger,
17-
process: new EventEmitter(),
1817
createHermesClient: vi.fn(() => Promise.resolve(client)),
19-
};
18+
} as unknown as CommandConfig;
2019
return { config, client, logger };
2120
}
2221

2322
describe("statusCommand", () => {
2423
it("displays client status information", async () => {
2524
const { config, client, logger } = setup();
26-
client.getStatus.mockReturnValueOnce({
25+
client.getStatus.mockResolvedValueOnce({
2726
address: "akash1sender",
2827
contractAddress: "akash1contract",
2928
priceFeedId: "feed-123",
@@ -41,7 +40,7 @@ describe("statusCommand", () => {
4140

4241
it("displays running status as yes when client is running", async () => {
4342
const { config, client, logger } = setup();
44-
client.getStatus.mockReturnValueOnce({
43+
client.getStatus.mockResolvedValueOnce({
4544
address: "akash1sender",
4645
contractAddress: "akash1contract",
4746
priceFeedId: "feed-123",
@@ -55,7 +54,7 @@ describe("statusCommand", () => {
5554

5655
it("displays RPC and Hermes endpoints from config", async () => {
5756
const { config, client, logger } = setup();
58-
client.getStatus.mockReturnValueOnce({
57+
client.getStatus.mockResolvedValueOnce({
5958
address: "akash1sender",
6059
contractAddress: "akash1contract",
6160
priceFeedId: "feed-123",
@@ -70,8 +69,8 @@ describe("statusCommand", () => {
7069

7170
it("uses default Hermes endpoint when not configured", async () => {
7271
const { config, client, logger } = setup();
73-
delete config.hermesEndpoint;
74-
client.getStatus.mockReturnValueOnce({
72+
(config.rawConfig as Record<string, unknown>).HERMES_ENDPOINT = "https://hermes.pyth.network";
73+
client.getStatus.mockResolvedValueOnce({
7574
address: "akash1sender",
7675
contractAddress: "akash1contract",
7776
priceFeedId: "feed-123",

src/cli-commands/status-command.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ export async function statusCommand(config: CommandConfig): Promise<void> {
44
config.logger?.log("Contract Status...\n");
55

66
const client = await config.createHermesClient(config);
7-
const status = client.getStatus();
7+
const status = await client.getStatus();
88

99
config.logger?.log("Client Status:");
1010
config.logger?.log("─────────────────────────────");
@@ -13,5 +13,5 @@ export async function statusCommand(config: CommandConfig): Promise<void> {
1313
config.logger?.log(`Price Feed ID: ${status.priceFeedId}`);
1414
config.logger?.log(`Running: ${status.isRunning ? "yes" : "no"}`);
1515
config.logger?.log(`RPC Endpoint: ${config.rpcEndpoint}`);
16-
config.logger?.log(`Hermes Endpoint: ${config.hermesEndpoint || "https://hermes.pyth.network"}`);
16+
config.logger?.log(`Hermes Endpoint: ${config.rawConfig.HERMES_ENDPOINT}`);
1717
}

src/cli-commands/update-command.test.ts

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,45 @@
1-
import EventEmitter from "node:events";
2-
import { describe, it, expect, vi } from "vitest";
1+
import { describe, expect, it, vi } from "vitest";
32
import { mock } from "vitest-mock-extended";
43
import type { HermesClient } from "../hermes-client.ts";
4+
import type { PriceUpdate } from "../types.ts";
55
import type { CommandConfig } from "./command-config.ts";
66
import { updateCommand } from "./update-command.ts";
77

8+
const fakePriceUpdate: PriceUpdate = {
9+
priceData: {
10+
id: "test-feed-id",
11+
price: { price: "100", conf: "1", expo: -8, publish_time: 1000 },
12+
ema_price: { price: "100", conf: "1", expo: -8, publish_time: 1000 },
13+
},
14+
vaa: "dGVzdC12YWE=",
15+
};
16+
17+
async function* fakePriceProducer(): AsyncGenerator<PriceUpdate, void, unknown> {
18+
yield fakePriceUpdate;
19+
}
20+
821
function setup() {
922
const client = mock<HermesClient>();
23+
client.queryConfig.mockResolvedValue({
24+
admin: "akash1admin",
25+
wormhole_contract: "akash1wormhole",
26+
update_fee: "1",
27+
price_feed_id: "test-feed-id",
28+
default_denom: "uakt",
29+
default_base_denom: "uakt",
30+
data_sources: [],
31+
});
1032
const logger = mock<Console>();
1133
const config: CommandConfig = {
1234
rpcEndpoint: "https://rpc.akashnet.net:443",
1335
contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu",
14-
mnemonic: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about",
36+
walletSecret: { type: "mnemonic", value: "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about" },
1537
logger,
16-
process: new EventEmitter(),
38+
signal: AbortSignal.abort(),
39+
healthcheckPort: 3000,
40+
rawConfig: {} as CommandConfig["rawConfig"],
41+
smartContractConfigCacheTTLMs: 60000,
42+
priceProducerFactory: vi.fn(() => fakePriceProducer()),
1743
createHermesClient: vi.fn(() => Promise.resolve(client)),
1844
};
1945
return { config, client, logger };
@@ -33,7 +59,7 @@ describe("updateCommand", () => {
3359
await updateCommand(config);
3460

3561
expect(config.createHermesClient).toHaveBeenCalledWith(config);
36-
expect(client.updatePrice).toHaveBeenCalledOnce();
62+
expect(client.updatePrice).toHaveBeenCalledWith(fakePriceUpdate);
3763
});
3864

3965
it("propagates errors from updatePrice", async () => {

0 commit comments

Comments
 (0)