11import { createLogger } from '@subsquid/logger'
2- import { CallOptions , RpcClient , RpcError , RpcProtocolError } from '@subsquid/rpc-client'
3- import { RpcCall , RpcErrorInfo } from '@subsquid/rpc-client/lib/interfaces'
4- import { GetBlock } from '@subsquid/solana-rpc-data'
2+ import { CallOptions , RetryError , RpcClient , RpcError , RpcProtocolError } from '@subsquid/rpc-client'
3+ import { RpcCall , RpcErrorInfo , RpcRequest } from '@subsquid/rpc-client/lib/interfaces'
4+ import { GetBlock , isVoteTransaction } from '@subsquid/solana-rpc-data'
5+ import { assertNotNull } from '@subsquid/util-internal'
56import {
67 array ,
78 B58 ,
@@ -49,10 +50,18 @@ export interface RpcApi {
4950
5051
5152export class Rpc implements RpcApi {
53+ private requests : ThresholdRequests
54+
5255 constructor (
5356 public readonly client : RpcClient ,
54- public readonly log = createLogger ( 'sqd:solana-rpc' )
55- ) { }
57+ public readonly txThreshold ?: number ,
58+ public readonly log = createLogger ( 'sqd:solana-rpc' ) ,
59+ ) {
60+ if ( this . txThreshold != null ) {
61+ assert ( this . txThreshold > 0 )
62+ }
63+ this . requests = new ThresholdRequests ( )
64+ }
5665
5766 call < T = any > ( method : string , params ?: any [ ] , options ?: CallOptions < T > ) : Promise < T > {
5867 return this . client . call ( method , params , options )
@@ -107,7 +116,7 @@ export class Rpc implements RpcApi {
107116 call [ i ] = { method : 'getBlock' , params}
108117 }
109118 return this . reduceBatchOnRetry < GetBlock | 'skipped' | null | undefined > ( call , {
110- validateResult : getResultValidator ( nullable ( GetBlock ) ) ,
119+ validateResult : ( result , req ) => this . validateGetBlockResult ( result , req ) ,
111120 validateError : captureNoBlockAtSlot
112121 } )
113122 }
@@ -132,6 +141,23 @@ export class Rpc implements RpcApi {
132141
133142 return pack . flat ( )
134143 }
144+
145+ validateGetBlockResult ( result : unknown , req : RpcRequest ) {
146+ let validator = getResultValidator ( nullable ( GetBlock ) )
147+ let block = validator ( result )
148+ if ( this . txThreshold && block != null && block . transactions != null ) {
149+ let transactions = block . transactions . filter ( tx => ! isVoteTransaction ( tx ) )
150+ if ( transactions . length < this . txThreshold ) {
151+ let slot = req . params ! [ 0 ] as any as number
152+ let retries = this . requests . get ( slot )
153+ if ( retries < 3 ) {
154+ this . requests . inc ( slot )
155+ throw new RetryError ( `transactions count is less than threshold: ${ transactions . length } < ${ this . txThreshold } ` )
156+ }
157+ }
158+ }
159+ return block
160+ }
135161}
136162
137163
@@ -152,3 +178,28 @@ function getResultValidator<V extends Validator>(validator: V): (result: unknown
152178 }
153179 }
154180}
181+
182+
183+ class ThresholdRequests {
184+ inner : Map < number , number >
185+
186+ constructor ( ) {
187+ this . inner = new Map ( )
188+ }
189+
190+ inc ( slot : number ) {
191+ if ( this . inner . size > 100 ) {
192+ let keys = this . inner . keys ( )
193+ for ( let i = 0 ; i < 20 ; i ++ ) {
194+ let res = keys . next ( )
195+ this . inner . delete ( assertNotNull ( res . value ) )
196+ }
197+ }
198+ let val = this . inner . get ( slot ) ?? 0
199+ this . inner . set ( slot , val + 1 )
200+ }
201+
202+ get ( slot : number ) {
203+ return this . inner . get ( slot ) ?? 0
204+ }
205+ }
0 commit comments