-
Notifications
You must be signed in to change notification settings - Fork 325
[OPDATA-4082] Add Coinpaprika State adapter #4058
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: 944e7d9 The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
9e8c109
to
76c20cb
Compare
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
any reason why this is not a new endpoint on existing coinpaprika EA ? |
1156b6b
to
14c598a
Compare
Updated to use existing EA with new endpoint |
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
saw this pr randomly, there's an sse transport in the framework, what's the particular reason this adapter needs a custom one? |
@alejoberardino I investigated SseTransport but found it's designed around the usual EventSource/GET model (and optional separate sub/unsub/keepalive calls). The current design of Coinpaprika State API instead opens a single SSE connection via POST with a JSON body containing all pairs and expects reconnects when the pair set changes. Given that shape, SubscriptionTransport is a cleaner fit for batching pairs and managing one shared connection + reconnection logic. |
7a62e70
to
2842eb9
Compare
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide a sample input param and sample output
68944e9
to
e535a01
Compare
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
packages/sources/coinpaprika-state/test/integration/adapter.test.ts
Outdated
Show resolved
Hide resolved
9b18db7
to
c396c39
Compare
packages/sources/coinpaprika-state/src/endpoint/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
de77ca1
to
0f9ba62
Compare
packages/sources/coinpaprika-state/test/integration/adapter.test.ts
Outdated
Show resolved
Hide resolved
7ac26d5
to
0857f7e
Compare
packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts
Outdated
Show resolved
Hide resolved
0857f7e
to
f8da8ab
Compare
f8da8ab
to
944e7d9
Compare
await this.responseCache.write(this.name, [{ params: param, response }]) | ||
} | ||
|
||
async _handleRequest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this ever be called?
* Single-connection SSE transport that batches all pairs into one POST and streams state_price ticks into the cache. | ||
* | ||
* */ | ||
export class CoinpaprikaStateTransport extends SubscriptionTransport<TransportTypes> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering, did you try extending StreamingTransport
and implementing streamHandler
?
At a glance it seems like that would manage subscriptions for you rather than rewriting havePairsChanged
.
BTW: Coinmetrics is a different WS-based EA that switches URLs for every new subscription, not sure if there is any inspiration to pull from it: https://github.com/smartcontractkit/external-adapters-js/blob/main/packages/sources/coinmetrics/src/transport/lwba.ts#L62-L75
return adapterSettings.WARMUP_SUBSCRIPTION_TTL | ||
} | ||
|
||
async close(): Promise<void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used?
* - Supports multi-line "data:" blocks | ||
* - Ignores comment/heartbeat lines starting with ':' | ||
*/ | ||
export class SSEParser { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there no library that can do some of this for us?
providerIndicatedTimeUnixMs: blockTime * 1000, | ||
}, | ||
} | ||
logger.debug(`tick ${param.base}/${param.quote}=${statePrice} t=${blockTime}`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are 2 logs needed or is 1 debug enough?
})) | ||
|
||
logger.debug(`Opening single SSE connection for ${pairsArray.length} pairs`) | ||
logger.debug(`Pairs: ${pairsArray.map((p) => `${p.base}/${p.quote}`).join(', ')}`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this a duplicate log in backgroundHandler
?
const { response } = await this.requester.request<Readable>(key, req) | ||
const httpResp = response | ||
|
||
if (httpResp.status !== 200 || !httpResp.data) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe extract this section into an sseConnectionErrorHandler
function for clarity since this is already a long function
quote: 'USD', | ||
}) | ||
|
||
expect([504, 502]).toContain(response.statusCode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
504 or 502?
scope.done() | ||
}) | ||
|
||
it('pair-set change triggers reconnect immediately (first stream kept open)', async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the (first stream kept open) part correct here?
logger.debug(`Pairs: ${pairsArray.map((p) => `${p.base}/${p.quote}`).join(', ')}`) | ||
|
||
this.lastConnectionAttempt = Date.now() | ||
this.currentAbortController = new AbortController() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could investigate whether it's worth including this in a superclass over the requester (something like AbortableRequester
). Might not be worth the effort but could save some of this complexity for setting/unsetting the AbortController
const delay = Math.max(this.reconnectDelay - since, 0) + Math.floor(Math.random() * 1000) | ||
logger.info(`SSE ended; reconnecting in ${delay} ms...`) | ||
await sleep(delay) | ||
await this.createSSEConnection(context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be doing this here or just let the next backgroundHandler invocation start a new connection?
In fact, could also extract the management of timing these connections into some SSEConnectionManager class or similar
Description
Adds a new Coinpaprika State external adapter that streams state_price via Server-Sent Events (SSE).
See package README for full details
Changes
@chainlink/coinpaprika-state-adapter
coinpaprika-state
(alias:state
)Sample Input and Output
Sample Input
Using the alias
state
Without endpoint (uses default)
Sample Output
Example cURL Requests
Using the primary endpoint
coinpaprika-state
Using the alias
state
Without endpoint (uses default)
Steps to Test
yarn workspace @chainlink/coinpaprika-state-adapter build
yarn test packages/sources/coinpaprika-state/test/integration/adapter.test.ts
yarn test packages/sources/coinpaprika-state/test/unit/sse.test.ts
Quality Assurance
infra-k8s
configuration file.adapter-secrets
configuration file or update the soak testing blacklist.test-payload.json
file with relevant requests.feature/x
,chore/x
,release/x
,hotfix/x
,fix/x
) or is created from Jira.