Skip to content

Commit 129ae27

Browse files
committed
feat: use sse to get balances
1 parent c35a88e commit 129ae27

File tree

7 files changed

+354
-12
lines changed

7 files changed

+354
-12
lines changed

apps/cowswap-frontend/src/modules/balancesAndAllowances/updaters/CommonPriorityBalancesAndAllowancesUpdater.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ export function CommonPriorityBalancesAndAllowancesUpdater(): ReactNode {
9191
chainId={sourceChainId}
9292
isBffSwitchedOn={isBffSwitchedOn}
9393
isBffEnabled={isBffEnabled}
94+
isSseEnabled={true}
9495
excludedTokens={priorityTokenAddresses}
9596
invalidateCacheTrigger={invalidateCacheTrigger}
9697
/>
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import { useCallback, useEffect, useRef, useState } from 'react'
2+
3+
import { BALANCES_SSE_URL } from '@cowprotocol/common-const'
4+
import { SupportedChainId } from '@cowprotocol/cow-sdk'
5+
6+
const RECONNECT_DELAY_MS = 3000
7+
const MAX_RECONNECT_ATTEMPTS = 5
8+
9+
export interface SseBalancesState {
10+
isConnected: boolean
11+
isLoading: boolean
12+
error: Error | null
13+
}
14+
15+
export interface UseSseBalancesParams {
16+
account: string | undefined
17+
chainId: SupportedChainId
18+
enabled: boolean
19+
tokensListsUrls: string[]
20+
customTokens?: string[]
21+
onAllBalances: (balances: Record<string, string>) => void
22+
onBalanceUpdate: (address: string, balance: string) => void
23+
onError?: (error: Error) => void
24+
}
25+
26+
const INITIAL_STATE: SseBalancesState = { isConnected: false, isLoading: false, error: null }
27+
28+
async function createSession(
29+
chainId: SupportedChainId,
30+
account: string,
31+
tokensListsUrls: string[],
32+
customTokens?: string[],
33+
): Promise<void> {
34+
const response = await fetch(`${BALANCES_SSE_URL}/${chainId}/sessions/${account}`, {
35+
method: 'POST',
36+
headers: { 'Content-Type': 'application/json' },
37+
body: JSON.stringify({ tokensListsUrls, customTokens }),
38+
})
39+
40+
if (!response.ok) {
41+
throw new Error(`Session creation failed: ${response.status}`)
42+
}
43+
}
44+
45+
interface EventHandlers {
46+
onAllBalances: (balances: Record<string, string>) => void
47+
onBalanceUpdate: (address: string, balance: string) => void
48+
onError?: (error: Error) => void
49+
onOpen: () => void
50+
onClose: () => void
51+
}
52+
53+
function setupEventSource(url: string, handlers: EventHandlers): EventSource {
54+
const { onAllBalances, onBalanceUpdate, onError, onOpen, onClose } = handlers
55+
const es = new EventSource(url)
56+
57+
es.onopen = onOpen
58+
59+
es.addEventListener('all_balances', (e: MessageEvent): void => {
60+
try {
61+
const { balances } = JSON.parse(e.data)
62+
if (balances && Object.keys(balances).length > 0) onAllBalances(balances)
63+
} catch {
64+
/* ignore */
65+
}
66+
})
67+
68+
es.addEventListener('balance_update', (e: MessageEvent): void => {
69+
try {
70+
const { address, balance } = JSON.parse(e.data)
71+
if (address && balance != null) onBalanceUpdate(address.toLowerCase(), balance)
72+
} catch {
73+
/* ignore */
74+
}
75+
})
76+
77+
es.addEventListener('error', (e: MessageEvent): void => {
78+
try {
79+
const { message, code } = JSON.parse(e.data)
80+
onError?.(new Error(`SSE Error ${code}: ${message}`))
81+
} catch {
82+
/* ignore */
83+
}
84+
})
85+
86+
es.onerror = (): void => {
87+
if (es.readyState === EventSource.CLOSED) onClose()
88+
}
89+
90+
return es
91+
}
92+
93+
export function useSseBalances(params: UseSseBalancesParams): SseBalancesState {
94+
const {
95+
account,
96+
chainId,
97+
enabled,
98+
tokensListsUrls,
99+
customTokens = [],
100+
onAllBalances,
101+
onBalanceUpdate,
102+
onError,
103+
} = params
104+
105+
const [state, setState] = useState<SseBalancesState>(INITIAL_STATE)
106+
const esRef = useRef<EventSource | null>(null)
107+
const attemptsRef = useRef(0)
108+
const timeoutRef = useRef<ReturnType<typeof setTimeout>>()
109+
110+
const cleanup = useCallback((): void => {
111+
esRef.current?.close()
112+
esRef.current = null
113+
clearTimeout(timeoutRef.current)
114+
}, [])
115+
116+
useEffect(() => {
117+
if (!enabled || !account || tokensListsUrls.length === 0) {
118+
cleanup()
119+
setState(INITIAL_STATE)
120+
return cleanup
121+
}
122+
123+
let cancelled = false
124+
125+
const connect = async (): Promise<void> => {
126+
if (cancelled) return
127+
cleanup()
128+
setState((s) => ({ ...s, isLoading: true, error: null }))
129+
130+
try {
131+
await createSession(chainId, account, tokensListsUrls, customTokens)
132+
} catch (e) {
133+
const error = e instanceof Error ? e : new Error('Session failed')
134+
setState({ ...INITIAL_STATE, error })
135+
onError?.(error)
136+
return
137+
}
138+
139+
if (cancelled) return
140+
141+
esRef.current = setupEventSource(`${BALANCES_SSE_URL}/sse/${chainId}/balances/${account}`, {
142+
onAllBalances,
143+
onBalanceUpdate,
144+
onError,
145+
onOpen: (): void => {
146+
attemptsRef.current = 0
147+
setState({ isConnected: true, isLoading: false, error: null })
148+
},
149+
onClose: (): void => {
150+
setState((s) => ({ ...s, isConnected: false }))
151+
if (attemptsRef.current < MAX_RECONNECT_ATTEMPTS) {
152+
attemptsRef.current++
153+
timeoutRef.current = setTimeout(() => void connect(), RECONNECT_DELAY_MS * 2 ** (attemptsRef.current - 1))
154+
} else {
155+
const err = new Error('SSE: Max reconnect attempts')
156+
setState((s) => ({ ...s, error: err }))
157+
onError?.(err)
158+
}
159+
},
160+
})
161+
}
162+
163+
void connect()
164+
return (): void => {
165+
cancelled = true
166+
cleanup()
167+
}
168+
}, [account, chainId, enabled, tokensListsUrls, customTokens, cleanup, onAllBalances, onBalanceUpdate, onError])
169+
170+
return state
171+
}

libs/balances-and-allowances/src/index.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1-
// Updater
1+
// Updaters
22
export { BalancesAndAllowancesUpdater } from './updaters/BalancesAndAllowancesUpdater'
33
export { PriorityTokensUpdater, PRIORITY_TOKENS_REFRESH_INTERVAL } from './updaters/PriorityTokensUpdater'
4+
export { BalancesBffUpdater } from './updaters/BalancesBffUpdater'
5+
export { BalancesRpcCallUpdater } from './updaters/BalancesRpcCallUpdater'
6+
export { BalancesSseUpdater } from './updaters/BalancesSseUpdater'
47

58
// Hooks
69
export { useTokensBalances } from './hooks/useTokensBalances'
@@ -14,15 +17,20 @@ export { useUpdateTokenBalance } from './hooks/useUpdateTokenBalance'
1417
export { useTokenAllowances } from './hooks/useTokenAllowances'
1518
export { useBalancesAndAllowances } from './hooks/useBalancesAndAllowances'
1619
export { useTradeSpenderAddress } from './hooks/useTradeSpenderAddress'
20+
export { useSseBalances } from './hooks/useSseBalances'
21+
22+
// State hooks
1723
export { useIsBffFailed } from './state/isBffFailedAtom'
18-
export { BalancesBffUpdater } from './updaters/BalancesBffUpdater'
19-
export { BalancesRpcCallUpdater } from './updaters/BalancesRpcCallUpdater'
20-
export type { BalancesAndAllowances } from './types/balances-and-allowances'
21-
export * from './utils/isBffSupportedNetwork'
24+
export { useIsSseFailed } from './state/isSseFailedAtom'
2225

2326
// Types
27+
export type { BalancesAndAllowances } from './types/balances-and-allowances'
2428
export type { BalancesState } from './state/balancesAtom'
2529
export type { AllowancesState } from './hooks/useTokenAllowances'
30+
export type { SseBalancesState, UseSseBalancesParams } from './hooks/useSseBalances'
31+
32+
// Utils
33+
export * from './utils/isBffSupportedNetwork'
2634

2735
// Consts
2836
export { DEFAULT_BALANCES_STATE } from './state/balancesAtom'
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { useSetAtom } from 'jotai'
2+
import { atom, useAtomValue } from 'jotai/index'
3+
4+
export const isSseFailedAtom = atom(false)
5+
6+
export function useIsSseFailed(): boolean {
7+
return useAtomValue(isSseFailedAtom)
8+
}
9+
10+
export function useSetIsSseFailed(): (value: boolean) => void {
11+
return useSetAtom(isSseFailedAtom)
12+
}

libs/balances-and-allowances/src/updaters/BalancesAndAllowancesUpdater.tsx

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { ReactNode, useEffect, useMemo } from 'react'
22

33
import { LpToken, NATIVE_CURRENCIES } from '@cowprotocol/common-const'
44
import type { SupportedChainId } from '@cowprotocol/cow-sdk'
5-
import { useAllActiveTokens } from '@cowprotocol/tokens'
5+
import { useAllActiveTokens, useListsEnabledState } from '@cowprotocol/tokens'
66

77
import ms from 'ms.macro'
88
import { SWRConfiguration } from 'swr'
@@ -11,11 +11,13 @@ import { BalancesBffUpdater } from './BalancesBffUpdater'
1111
import { BalancesCacheUpdater } from './BalancesCacheUpdater'
1212
import { BalancesResetUpdater } from './BalancesResetUpdater'
1313
import { BalancesRpcCallUpdater } from './BalancesRpcCallUpdater'
14+
import { BalancesSseUpdater } from './BalancesSseUpdater'
1415

1516
import { BASIC_MULTICALL_SWR_CONFIG } from '../consts'
1617
import { useNativeTokenBalance } from '../hooks/useNativeTokenBalance'
1718
import { useSwrConfigWithPauseForNetwork } from '../hooks/useSwrConfigWithPauseForNetwork'
1819
import { useUpdateTokenBalance } from '../hooks/useUpdateTokenBalance'
20+
import { useIsSseFailed } from '../state/isSseFailedAtom'
1921

2022
// A small gap between balances and allowances refresh intervals is needed to avoid high load to the node at the same time
2123
const RPC_BALANCES_SWR_CONFIG: SWRConfiguration = { ...BASIC_MULTICALL_SWR_CONFIG, refreshInterval: ms`31s` }
@@ -27,8 +29,12 @@ export interface BalancesAndAllowancesUpdaterProps {
2729
chainId: SupportedChainId
2830
invalidateCacheTrigger: number
2931
excludedTokens: Set<string>
32+
/** @deprecated Use isSseEnabled instead */
3033
isBffSwitchedOn: boolean
34+
/** @deprecated Use isSseEnabled instead */
3135
isBffEnabled?: boolean
36+
/** Enable SSE-based real-time balance updates */
37+
isSseEnabled?: boolean
3238
}
3339

3440
export function BalancesAndAllowancesUpdater({
@@ -38,8 +44,10 @@ export function BalancesAndAllowancesUpdater({
3844
isBffSwitchedOn,
3945
excludedTokens,
4046
isBffEnabled,
47+
isSseEnabled = false,
4148
}: BalancesAndAllowancesUpdaterProps): ReactNode {
4249
const updateTokenBalance = useUpdateTokenBalance()
50+
const isSseFailed = useIsSseFailed()
4351

4452
const allTokens = useAllActiveTokens()
4553
const { data: nativeTokenBalance } = useNativeTokenBalance(account, chainId)
@@ -55,31 +63,57 @@ export function BalancesAndAllowancesUpdater({
5563
}, [])
5664
}, [allTokens, chainId])
5765

66+
// Get enabled token list URLs for SSE
67+
const listsEnabledState = useListsEnabledState()
68+
const tokensListsUrls = useMemo(() => {
69+
return Object.entries(listsEnabledState)
70+
.filter(([, isEnabled]) => isEnabled === true)
71+
.map(([url]) => url)
72+
}, [listsEnabledState])
73+
5874
const rpcBalancesSwrConfig = useSwrConfigWithPauseForNetwork(chainId, account, RPC_BALANCES_SWR_CONFIG)
75+
76+
// Determine which updater to use
77+
const hasSseTokenLists = tokensListsUrls.length > 0
78+
const useSse = isSseEnabled && !isSseFailed && hasSseTokenLists
79+
const useBff = !isSseEnabled && isBffEnabled
80+
const useRpcFallback = (!isBffSwitchedOn || !isBffEnabled) && !useSse
81+
5982
// Add native token balance to the store as well
6083
useEffect(() => {
61-
if (isBffSwitchedOn) return
84+
if (isBffSwitchedOn || isSseEnabled) return
6285

6386
const nativeToken = NATIVE_CURRENCIES[chainId]
6487

6588
if (nativeToken && nativeTokenBalance) {
6689
updateTokenBalance(nativeToken.address, nativeTokenBalance)
6790
}
68-
}, [isBffSwitchedOn, nativeTokenBalance, chainId, updateTokenBalance])
69-
70-
const enableRpcFallback = !isBffSwitchedOn || !isBffEnabled
91+
}, [isBffSwitchedOn, isSseEnabled, nativeTokenBalance, chainId, updateTokenBalance])
7192

7293
return (
7394
<>
74-
{isBffEnabled && (
95+
{/* SSE-based real-time updates (preferred) */}
96+
{useSse && (
97+
<BalancesSseUpdater
98+
account={account}
99+
chainId={chainId}
100+
tokenAddresses={tokenAddresses}
101+
tokensListsUrls={tokensListsUrls}
102+
/>
103+
)}
104+
105+
{/* Legacy BFF polling (deprecated, for backward compatibility) */}
106+
{useBff && (
75107
<BalancesBffUpdater
76108
account={account}
77109
chainId={chainId}
78110
invalidateCacheTrigger={invalidateCacheTrigger}
79111
tokenAddresses={tokenAddresses}
80112
/>
81113
)}
82-
{enableRpcFallback && (
114+
115+
{/* RPC fallback when SSE/BFF fails or is disabled */}
116+
{(useRpcFallback || isSseFailed) && (
83117
<BalancesRpcCallUpdater
84118
account={account}
85119
chainId={chainId}
@@ -88,6 +122,7 @@ export function BalancesAndAllowancesUpdater({
88122
setLoadingState
89123
/>
90124
)}
125+
91126
<BalancesResetUpdater chainId={chainId} account={account} />
92127
<BalancesCacheUpdater chainId={chainId} account={account} excludedTokens={excludedTokens} />
93128
</>

0 commit comments

Comments
 (0)