Skip to content

Commit f745013

Browse files
committed
refactor: moves price producer creation into hermes client from update command
1 parent 9392948 commit f745013

8 files changed

Lines changed: 42 additions & 68 deletions

File tree

package.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
"name": "akash-hermes-client",
33
"version": "1.0.0",
44
"description": "Hermes client for updating Akash oracle with Pyth price data",
5-
"main": "dist/hermes-client.js",
6-
"types": "dist/hermes-client.d.ts",
75
"type": "module",
86
"bin": {
97
"hermes-cli": "dist/cli.js"

src/cli-commands/update-command.test.ts

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,9 @@
11
import { describe, expect, it, vi } from "vitest";
22
import { mock } from "vitest-mock-extended";
33
import type { HermesClient } from "../hermes-client.ts";
4-
import type { PriceUpdate } from "../types.ts";
54
import type { CommandConfig } from "./command-config.ts";
65
import { updateCommand } from "./update-command.ts";
76

8-
const fakePriceUpdate: PriceUpdate = {
9-
priceData: {
10-
id: "test-feed-id",
11-
price: { price: "100", conf: "1", expo: -8, publish_time: 1000 },
12-
ema_price: { price: "100", conf: "1", expo: -8, publish_time: 1000 },
13-
},
14-
vaa: "dGVzdC12YWE=",
15-
};
16-
17-
async function* fakePriceProducer(): AsyncGenerator<PriceUpdate, void, unknown> {
18-
yield fakePriceUpdate;
19-
}
20-
217
function setup() {
228
const client = mock<HermesClient>();
239
client.queryConfig.mockResolvedValue({
@@ -39,7 +25,7 @@ function setup() {
3925
healthcheckPort: 3000,
4026
rawConfig: {} as CommandConfig["rawConfig"],
4127
smartContractConfigCacheTTLMs: 60000,
42-
priceProducerFactory: vi.fn(() => fakePriceProducer()),
28+
priceProducerFactory: vi.fn(),
4329
createHermesClient: vi.fn(() => Promise.resolve(client)),
4430
};
4531
return { config, client, logger };
@@ -51,15 +37,14 @@ describe("updateCommand", () => {
5137
await updateCommand(config);
5238

5339
expect(logger.log).toHaveBeenCalledWith("Updating oracle price...\n");
54-
expect(logger.log).toHaveBeenCalledWith("\nUpdate completed successfully!");
5540
});
5641

5742
it("creates client and calls updatePrice", async () => {
5843
const { config, client } = setup();
5944
await updateCommand(config);
6045

6146
expect(config.createHermesClient).toHaveBeenCalledWith(config);
62-
expect(client.updatePrice).toHaveBeenCalledWith(fakePriceUpdate);
47+
expect(client.updatePrice).toHaveBeenCalledWith({ signal: config.signal });
6348
});
6449

6550
it("propagates errors from updatePrice", async () => {

src/cli-commands/update-command.ts

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,5 @@ import type { CommandConfig } from "./command-config.ts";
33
export async function updateCommand(config: CommandConfig): Promise<void> {
44
config.logger?.log("Updating oracle price...\n");
55
const client = await config.createHermesClient(config);
6-
const smartCotractConfig = await client.queryConfig();
7-
const priceStream = config.priceProducerFactory({
8-
priceFeedId: smartCotractConfig.price_feed_id,
9-
logger: config.logger,
10-
signal: config.signal,
11-
});
12-
const priceUpdate = await priceStream.next();
13-
if (priceUpdate.value) {
14-
await client.updatePrice(priceUpdate.value);
15-
config.logger?.log("\nUpdate completed successfully!");
16-
} else {
17-
config.logger?.log("\nUpdate skipped because no new price was available.");
18-
}
6+
await client.updatePrice({ signal: config.signal });
197
}

src/hermes-client.ts

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import { SigningCosmWasmClient } from "@cosmjs/cosmwasm-stargate";
1515
import { DirectSecp256k1HdWallet, DirectSecp256k1Wallet, type OfflineDirectSigner } from "@cosmjs/proto-signing";
1616
import { GasPrice } from "@cosmjs/stargate";
17-
import { priceUpdateCounter, priceStaleness } from "./metrics.ts";
17+
import { priceUpdateCounter, blockchainPriceStaleness } from "./metrics.ts";
1818
import { latestValue } from "./price-stream/latest-value/latest-value.ts";
1919
import { PriceUpdateConfirmed } from "./price-update/price-update-confirmed/price-update-confirmed.ts";
2020
import type { Logger, PriceProducerFactory, PriceUpdate, PriceUpdater, PythPriceData } from "./types.ts";
@@ -153,23 +153,6 @@ interface OracleParamsResponse {
153153

154154
const DEFAULT_PRICE_DEVIATION_TOLERANCE: Required<HermesConfig>["priceDeviationTolerance"] = { type: "absolute", value: 0 };
155155

156-
export type ErrorCode = "insufficient_balance" | "timeout" | "connection_issue" | "unknown";
157-
158-
export function classifyError(error: unknown): ErrorCode {
159-
const message = error instanceof Error ? error.message : "";
160-
161-
if (/insufficient funds|insufficient fee/i.test(message)) {
162-
return "insufficient_balance";
163-
}
164-
if (/timeout|ETIMEDOUT/i.test(message)) {
165-
return "timeout";
166-
}
167-
if (/ECONNREFUSED|ECONNRESET|ENOTFOUND/i.test(message)) {
168-
return "connection_issue";
169-
}
170-
return "unknown";
171-
}
172-
173156
export class HermesClient {
174157
#cosmClient?: SigningCosmWasmClient;
175158
#wallet?: OfflineDirectSigner;
@@ -434,14 +417,21 @@ export class HermesClient {
434417
throw new Error("Client not initialized");
435418
}
436419

420+
if (this.#insufficientBalanceCooldownUntil !== null) {
421+
if (Date.now() < this.#insufficientBalanceCooldownUntil) {
422+
this.#logger.warn("Skipping price update: insufficient balance cooldown active");
423+
return;
424+
}
425+
this.#logger.log("Insufficient balance cooldown expired, retrying...");
426+
}
427+
437428
const startTime = performance.now();
438429

439430
try {
440431
const currentPrice = await this.queryCurrentPrice();
441432

442-
// Record staleness: how far behind on-chain is from Pyth
443433
const staleness = priceUpdate.priceData.price.publish_time - currentPrice.publish_time;
444-
priceStaleness.record(staleness);
434+
blockchainPriceStaleness.record(staleness);
445435

446436
if (this.#canIgnorePriceUpdate(priceUpdate.priceData, currentPrice)) {
447437
priceUpdateCounter.add(1, { result: "skipped" });
@@ -450,7 +440,6 @@ export class HermesClient {
450440

451441
const config = await this.queryConfig();
452442

453-
// Execute update
454443
this.#logger.log("Submitting VAA to Pyth contract...");
455444
this.#logger.log(` Wormhole contract: ${config.wormhole_contract}`);
456445
this.#priceUpdater ??= new PriceUpdateConfirmed(this.#getCosmClient());
@@ -469,6 +458,7 @@ export class HermesClient {
469458
this.#logger.log(` New price: ${price.price} (expo: ${price.expo})`);
470459
priceUpdateCounter.add(1, { result: "success" });
471460
this.#lastPriceUpdateAt = new Date().toISOString();
461+
this.#insufficientBalanceCooldownUntil = null;
472462
} catch (error) {
473463
// SEC-04: Sanitize error messages to prevent information leakage
474464
const errorCode = classifyError(error);
@@ -585,15 +575,7 @@ export class HermesClient {
585575
const updatePrices = async () => {
586576
for await (const priceUpdate of priceUpdates) {
587577
try {
588-
if (this.#insufficientBalanceCooldownUntil !== null) {
589-
if (Date.now() < this.#insufficientBalanceCooldownUntil) {
590-
this.#logger.warn("Skipping price update: insufficient balance cooldown active");
591-
continue;
592-
}
593-
this.#logger.log("Insufficient balance cooldown expired, retrying...");
594-
}
595578
await this.#updatePrice(priceUpdate);
596-
this.#insufficientBalanceCooldownUntil = null;
597579
} catch (error) {
598580
this.#logger.error("Error in scheduled update:", error);
599581
}
@@ -637,7 +619,23 @@ export class HermesClient {
637619
}
638620
}
639621

640-
// Export types for external use
641622
export type {
642623
ConfigResponse, DataSourceResponse, OracleParamsResponse, PriceFeedIdResponse, PriceFeedResponse, PriceResponse, RefreshOracleParamsMsg, TransferAdminMsg, UpdateFeeMsg,
643624
};
625+
626+
export type ErrorCode = "insufficient_balance" | "timeout" | "connection_issue" | "unknown";
627+
628+
export function classifyError(error: unknown): ErrorCode {
629+
const message = error instanceof Error ? error.message : "";
630+
631+
if (/insufficient funds|insufficient fee/i.test(message)) {
632+
return "insufficient_balance";
633+
}
634+
if (/timeout|ETIMEDOUT/i.test(message)) {
635+
return "timeout";
636+
}
637+
if (/ECONNREFUSED|ECONNRESET|ENOTFOUND/i.test(message)) {
638+
return "connection_issue";
639+
}
640+
return "unknown";
641+
}

src/metrics.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export const hermesFetchDuration = meter.createHistogram("hermes.price_fetch_dur
88
description: "Duration of Hermes API price fetch in milliseconds",
99
unit: "ms",
1010
});
11-
export const priceStaleness = meter.createGauge("hermes.price_staleness", {
11+
export const blockchainPriceStaleness = meter.createGauge("hermes.chain_price_staleness", {
1212
description: "How far behind the on-chain price is from the latest Pyth price",
1313
unit: "s",
1414
});

src/price-stream/polling-price-stream/polling-price-stream.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ export async function *pollPriceStream(options: PollPriceStreamOptions): AsyncGe
2727
while (!options.signal?.aborted) {
2828
const fetchStart = performance.now();
2929
response = undefined;
30+
status = 0;
3031
try {
31-
response = await fetch(`${options.baseUrl}/v2/updates/price/latest?${params.toString()}`);
32+
const timeoutSignal = AbortSignal.timeout(10_000);
33+
response = await fetch(`${options.baseUrl}/v2/updates/price/latest?${params.toString()}`, {
34+
signal: options.signal ? AbortSignal.any([options.signal, timeoutSignal]) : timeoutSignal,
35+
});
3236
status = response.status;
3337
} catch (error) {
3438
if (error instanceof Error && (error.name === "AbortError" || error.message === "AbortError")) {

src/price-stream/price-sse-stream/price-sse-stream.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ describe(priceSSEStream.name, () => {
7575
const gen = priceSSEStream(options);
7676
await gen.next();
7777

78-
expect(fetchMock.mock.calls[0][1]).toEqual(
79-
expect.objectContaining({ signal: controller.signal }),
80-
);
78+
const fetchOptions = fetchMock.mock.calls[0][1] as { signal: AbortSignal };
79+
expect(fetchOptions.signal).toBeInstanceOf(AbortSignal);
80+
expect(fetchOptions.signal.aborted).toBe(false);
8181
});
8282

8383
it("throws on non-ok HTTP response after max retries", async () => {

src/price-stream/price-sse-stream/price-sse-stream.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ export async function *priceSSEStream(options: PriceSSEStreamOptions): AsyncGene
3131
}
3232

3333
options.logger?.log(`Connecting to Hermes price stream at ${options.baseUrl}${lastEventId ? ` (Last-Event-ID: ${lastEventId})` : ""}...`);
34+
const timeoutSignal = AbortSignal.timeout(10_000);
3435
const response = await fetch(`${options.baseUrl}/v2/updates/price/stream?${params.toString()}`, {
3536
headers,
36-
signal: options.signal,
37+
signal: options.signal ? AbortSignal.any([options.signal, timeoutSignal]) : timeoutSignal,
3738
});
3839
if (!response.ok) {
3940
const statusText = response.status ? ` (HTTP ${response.status})` : "";

0 commit comments

Comments
 (0)