Skip to content

Commit af644be

Browse files
committed
fix wormhole recheck pending journeys in trips
1 parent 6dee4fd commit af644be

File tree

3 files changed

+85
-15
lines changed

3 files changed

+85
-15
lines changed

packages/server/src/services/agents/wormhole/agent.ts

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@ import { LRUCache } from 'lru-cache'
22
import { Observer, Subscription } from 'rxjs'
33
import { ago } from '@/common/time.js'
44
import { createTypedEventEmitter, deepCamelize } from '@/common/util.js'
5-
import { WormholeIds, WormholeSupportedNetworks } from '@/services/agents/wormhole/types/chain.js'
5+
import {
6+
urnToChainId,
7+
WormholeIds,
8+
WormholeSupportedNetworks,
9+
} from '@/services/agents/wormhole/types/chain.js'
610
import { WormholescanClient } from '@/services/networking/apis/wormhole/client.js'
711
import { makeWormholeLevelStorage } from '@/services/networking/apis/wormhole/storage.js'
812
import {
@@ -26,6 +30,8 @@ import { mapOperationToJourney, NewJourneyWithAssets } from './mappers/index.js'
2630
import { TelemetryWormholeEventEmitter } from './telemetry/events.js'
2731
import { collectWormholeStats, wormholeAgentMetrics } from './telemetry/metrics.js'
2832

33+
const OPERATION_LOOKUP_WINDOW_MS = 5 * 60_000
34+
2935
function isChainSupported(chainId?: number): boolean {
3036
return chainId === undefined || WormholeSupportedNetworks.includes(chainId)
3137
}
@@ -157,25 +163,57 @@ export class WormholeAgent implements Agent {
157163

158164
this.#log.info('[agent:%s] rechecking pending journeys...', this.id)
159165

160-
const pendings = await this.#repository.getJourneysByStatus('sent', [...WormholeProtocols])
166+
const pendings = await this.#repository.getJourneysByStatus(['sent', 'waiting'], [...WormholeProtocols])
161167

162168
for (const journey of pendings) {
163-
try {
169+
await this.#recheckJourney(journey)
170+
}
171+
172+
this.#log.info('[agent:%s] pending recheck complete (%s items)', this.id, pendings.length)
173+
}
174+
175+
async #recheckJourney(journey: Journey) {
176+
try {
177+
if (this.#watcher.isWormholeId(journey.correlation_id)) {
164178
const op = await this.#watcher.fetchOperationById(journey.correlation_id)
165179
if (op) {
166180
await this.#onOperation(op)
167181
}
168-
} catch (err) {
169-
this.#log.warn(
170-
err,
171-
'[agent:%s] failed to recheck pending journey %s',
172-
this.id,
173-
journey.correlation_id,
174-
)
182+
return
175183
}
184+
185+
await this.#recheckBySearch(journey)
186+
} catch (err) {
187+
this.#log.warn(err, '[agent:%s] failed to recheck pending journey %s', this.id, journey.correlation_id)
176188
}
189+
}
177190

178-
this.#log.info('[agent:%s] pending recheck complete (%s items)', this.id, pendings.length)
191+
async #recheckBySearch(journey: Journey) {
192+
const isOriginLeg = isWormholeProtocol(journey.origin_protocol)
193+
const address = isOriginLeg ? journey.from : journey.to
194+
195+
const stop = journey.stops.find((s: any) => s.type === 'wormhole' || isWormholeProtocol(s.type))
196+
if (!stop) {
197+
return
198+
}
199+
200+
const sourceChain = urnToChainId(stop.from.chainId)
201+
const targetChain = urnToChainId(stop.to.chainId)
202+
203+
const from = new Date(journey.sent_at - OPERATION_LOOKUP_WINDOW_MS).toISOString()
204+
const to = new Date(journey.sent_at + OPERATION_LOOKUP_WINDOW_MS).toISOString()
205+
206+
const { operations } = await this.#watcher.fetchOperations({
207+
address,
208+
sourceChain,
209+
targetChain,
210+
from,
211+
to,
212+
})
213+
214+
for (const op of operations) {
215+
await this.#onOperation(op)
216+
}
179217
}
180218

181219
#onOperation = async (op: WormholeOperation) => {
@@ -192,8 +230,13 @@ export class WormholeAgent implements Agent {
192230
const journey = mapOperationToJourney(op, this.#repository.generateTripId.bind(this))
193231
if (this.#replacedJourneysCache.has(journey.correlation_id)) {
194232
this.#log.info('[agent:%s] Journey replaced for correlationId: %s', this.id, journey.correlation_id)
233+
return
195234
}
196235
const existingTrips = await this.#repository.getJourneyByTripId(journey.trip_id)
236+
if (existingTrips.every((t) => t.status === 'received')) {
237+
this.#log.info('[agent:%s] Existing trips already completed: %s', this.id, journey.trip_id)
238+
return
239+
}
197240
const existingJourney = await this.#repository.getJourneyByCorrelationId(journey.correlation_id)
198241
if (!existingJourney) {
199242
const { assets, ...journeyWithoutAssets } = journey
@@ -213,7 +256,7 @@ export class WormholeAgent implements Agent {
213256
return
214257
}
215258

216-
if (existingJourney.status !== journey.status) {
259+
if (existingJourney.status !== 'received' && existingJourney.status !== journey.status) {
217260
const update: JourneyUpdate = {}
218261
if (isWormholeProtocol(journey.destination_protocol) || journey.status !== 'received') {
219262
update.status = journey.status

packages/server/src/services/networking/apis/wormhole/client.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { WormholeOperation } from './types.js'
55

66
const DEFAULT_PAGE_SIZE = 25
77

8-
type WormholeOperationParams = {
8+
export type WormholeOperationParams = {
99
pageSize?: number
1010
from?: string // ISO time
1111
to?: string // ISO time
@@ -154,4 +154,13 @@ export class WormholescanClient {
154154
const op = await this.#api.get(`api/v1/operations/${urlId}`, { signal }).json<WormholeOperation>()
155155
return op
156156
}
157+
158+
isWormholeId(maybeWormholeId: WormholeId) {
159+
try {
160+
normalizeWormholeId(maybeWormholeId)
161+
return true
162+
} catch (_e) {
163+
return false
164+
}
165+
}
157166
}

packages/server/src/services/networking/apis/wormhole/watcher.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import pLimit from 'p-limit'
22
import { Observable } from 'rxjs'
33

44
import { JourneyStatus } from '@/services/agents/crosschain/index.js'
5-
import { WormholescanClient } from './client.js'
5+
import { WormholeOperationParams, WormholescanClient } from './client.js'
66
import { toStatus } from './status.js'
77
import { PersistentWatcherStorage } from './storage.js'
88
import { WormholeOperation } from './types.js'
@@ -86,6 +86,13 @@ export function makeWatcher(client: WormholescanClient, storage?: PersistentWatc
8686
return await client.fetchOperationById(id)
8787
}
8888

89+
async function fetchOperations(
90+
params: WormholeOperationParams & { page?: number },
91+
signal?: AbortSignal | null,
92+
): Promise<{ operations: WormholeOperation[]; total: number }> {
93+
return await client.fetchOperations(params, signal)
94+
}
95+
8996
async function fetchBatch(
9097
state: WatcherState,
9198
signal?: AbortSignal | null,
@@ -296,7 +303,18 @@ export function makeWatcher(client: WormholescanClient, storage?: PersistentWatc
296303
})
297304
}
298305

299-
return { fetchOperationById, operations$, loadInitialState, pendingCount: () => pending.size }
306+
function isWormholeId(maybeWormholeId: string) {
307+
return client.isWormholeId(maybeWormholeId)
308+
}
309+
310+
return {
311+
fetchOperationById,
312+
fetchOperations,
313+
operations$,
314+
loadInitialState,
315+
isWormholeId,
316+
pendingCount: () => pending.size,
317+
}
300318
}
301319

302320
export type WormholeWatcher = ReturnType<typeof makeWatcher>

0 commit comments

Comments
 (0)