Skip to content

Commit 6365800

Browse files
committed
feat: adds support for price SSE stream
1 parent 280c823 commit 6365800

10 files changed

Lines changed: 613 additions & 42 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ akash query wasm contract-state smart $CONTRACT_ADDRESS '{"get_config":{}}'
175175
| `WALLET_SECRET` | Yes | - | Either `privateKey:<private key in hex format>` or `mnemonic:<12/24 words>` |
176176
| `HERMES_ENDPOINT` | No | `https://hermes.pyth.network` | Pyth Hermes API |
177177
| `PRICE_DEVIATION_TOLERANCE` | No | 0 | absolute or percentage value for price deviations which should be ignored (e.g., `100` or `10%`) |
178+
| `PRICE_FETCHING_METHOD` | No | polling | `polling` or `sse` |
178179
| `UPDATE_INTERVAL_MS` | No | `5000` | Update interval (default 5 sec) |
179180
| `GAS_PRICE` | No | `0.025uakt` | Gas price |
180181
| `DENOM` | No | `uakt` | Token denomination |

package-lock.json

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
"@opentelemetry/resources": "^2.5.0",
4545
"@opentelemetry/sdk-node": "^0.211.0",
4646
"commander": "^12.0.0",
47+
"fetch-event-stream": "^0.1.6",
4748
"zod": "^4.3.6"
4849
},
4950
"devDependencies": {

src/cli-commands/command-config.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import { z } from "zod";
22
import { HermesClient, type HermesConfig } from "../hermes-client.ts";
33
import { validateContractAddress, validateWalletSecret } from "../validation.ts";
44
import type { PriceProducerFactoryOptions } from "../types.ts";
5-
import { pollPriceStream } from "../price-stream/polling-price-stream.ts";
5+
import { pollPriceStream } from "../price-stream/polling-price-stream/polling-price-stream.ts";
6+
import { priceSSEStream } from "../price-stream/price-sse-stream/price-sse-stream.ts";
67

78
export interface CommandConfig extends HermesConfig {
89
createHermesClient: (config: HermesConfig) => Promise<HermesClient>;
@@ -41,6 +42,7 @@ const configSchema = z.object({
4142
});
4243
}
4344
}).optional(),
45+
PRICE_FETCHING_METHOD: z.enum(["polling", "sse"]).default("polling"),
4446
UPDATE_INTERVAL_MS: z.coerce.number().int().nonnegative().default(5 * 1000), // Default to 5 seconds
4547
HEALTHCHECK_PORT: z.coerce.number().int().min(1).max(65535).default(3000),
4648
GAS_PRICE: z.string().regex(/^(\d+)(\.\d+)?uakt$/, { message: 'GAS_PRICE must be a valid number with unit (e.g., "0.025uakt")' }).default("0.025uakt"),
@@ -71,6 +73,13 @@ export function parseConfig(config: Record<string, string | undefined>): ParseCo
7173
priceDeviationTolerance: result.data.PRICE_DEVIATION_TOLERANCE,
7274
smartContractConfigCacheTTLMs: result.data.SMART_CONTRACT_CONFIG_CACHE_TTL_MS,
7375
priceProducerFactory(options: PriceProducerFactoryOptions) {
76+
if (result.data.PRICE_FETCHING_METHOD === "sse") {
77+
return priceSSEStream({
78+
...options,
79+
unsafeAllowInsecureEndpoints,
80+
baseUrl: result.data.HERMES_ENDPOINT,
81+
});
82+
}
7483
return pollPriceStream({
7584
...options,
7685
unsafeAllowInsecureEndpoints,

src/hermes-client.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,17 @@ export class HermesClient {
528528
const consumePrices = async () => {
529529
for await (const priceUpdate of priceStream) {
530530
priceUpdates.set(priceUpdate);
531+
532+
const price = priceUpdate.priceData.price;
533+
this.#logger?.log(
534+
`Received price from Hermes: ${price.price} (expo: ${price.expo})`,
535+
);
536+
this.#logger?.log(
537+
` Confidence: ${price.conf}, Publish time: ${price.publish_time}`,
538+
);
539+
this.#logger?.log(
540+
` VAA size: ${priceUpdate.vaa.length} bytes (base64)`,
541+
);
531542
}
532543
controller.abort();
533544
};

src/price-stream/polling-price-stream.test.ts renamed to src/price-stream/polling-price-stream/polling-price-stream.test.ts

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { describe, expect, it, vi } from "vitest";
2-
import type { HermesResponse } from "../types.ts";
2+
import type { HermesResponse } from "../../types.ts";
33
import { pollPriceStream, type PollPriceStreamOptions } from "./polling-price-stream.ts";
44

55
describe("pollPriceStream", () => {
@@ -106,22 +106,6 @@ describe("pollPriceStream", () => {
106106
expect(result.value).toEqual({ priceData: goodData.parsed[0], vaa: goodData.binary.data[0] });
107107
});
108108

109-
it("logs price details on successful fetch", async () => {
110-
const logger = { log: vi.fn(), error: vi.fn(), warn: vi.fn() };
111-
const data = createHermesResponse();
112-
const options = createOptions({
113-
fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse(data)),
114-
logger,
115-
});
116-
117-
const gen = pollPriceStream(options);
118-
await gen.next();
119-
120-
expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("Fetched price from Hermes: 1000"));
121-
expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("Confidence: 10"));
122-
expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("VAA size:"));
123-
});
124-
125109
it("polls repeatedly yielding updates", async () => {
126110
const data1 = createHermesResponse();
127111
const data2 = createHermesResponse({

src/price-stream/polling-price-stream.ts renamed to src/price-stream/polling-price-stream/polling-price-stream.ts

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ import https from "node:https";
33
import { performance } from "node:perf_hooks";
44
import { Readable } from "node:stream";
55
import { setTimeout as delay } from "node:timers/promises";
6-
import { hermesFetchDuration } from "../metrics.ts";
7-
import type { HermesResponse, PriceProducerFactoryOptions, PriceUpdate, PythPriceData } from "../types.ts";
8-
import { validateEndpointUrl } from "../validation.ts";
6+
import { hermesFetchDuration } from "../../metrics.ts";
7+
import type { HermesResponse, PriceProducerFactoryOptions, PriceUpdate } from "../../types.ts";
8+
import { validateEndpointUrl } from "../../validation.ts";
9+
import { parsePriceUpdate } from "../utils.ts";
910

1011
export async function *pollPriceStream(options: PollPriceStreamOptions): AsyncGenerator<PriceUpdate> {
1112
if (!options.priceFeedId) {
@@ -37,7 +38,6 @@ export async function *pollPriceStream(options: PollPriceStreamOptions): AsyncGe
3738
continue;
3839
} finally {
3940
hermesFetchDuration.record(performance.now() - fetchStart, { status });
40-
console.log(`Fetch from Hermes completed with status ${status} in ${performance.now() - fetchStart} ms`);
4141
}
4242

4343
if (!response.ok) {
@@ -48,32 +48,22 @@ export async function *pollPriceStream(options: PollPriceStreamOptions): AsyncGe
4848
continue;
4949
}
5050

51-
const data = await response.json() as HermesResponse;
52-
53-
if (!data.parsed || data.parsed.length === 0) {
54-
options.logger?.error("No price data returned from Hermes");
51+
let parsedData: HermesResponse;
52+
try {
53+
parsedData = await response.json() as HermesResponse;
54+
} catch (error) {
55+
options.logger?.error(`Error parsing JSON from Hermes: ${(error as Error).message}`);
5556
continue;
5657
}
5758

58-
if (!data.binary?.data || data.binary.data.length === 0) {
59-
options.logger?.error("No VAA binary data returned from Hermes");
59+
const priceUpdateResult = parsePriceUpdate(parsedData);
60+
61+
if (!priceUpdateResult.ok) {
62+
options.logger?.error(priceUpdateResult.message);
6063
continue;
6164
}
6265

63-
const priceData: PythPriceData = data.parsed[0];
64-
const vaa: string = data.binary.data[0];
65-
66-
options.logger?.log(
67-
`Fetched price from Hermes: ${priceData.price.price} (expo: ${priceData.price.expo})`,
68-
);
69-
options.logger?.log(
70-
` Confidence: ${priceData.price.conf}, Publish time: ${priceData.price.publish_time}`,
71-
);
72-
options.logger?.log(
73-
` VAA size: ${vaa.length} bytes (base64)`,
74-
);
75-
76-
yield { priceData, vaa };
66+
yield priceUpdateResult.value;
7767
if (options.pollingIntervalMs > 0) {
7868
await delay(options.pollingIntervalMs, undefined, { signal: options.signal })
7969
.catch((error) => options.logger?.warn(`Polling delay interrupted: ${(error as Error).message}`));

0 commit comments

Comments
 (0)