@@ -33,6 +33,7 @@ def __init__(self, api_key: str, wallets: list[dict], chain_id: str, **kwargs):
3333 }
3434 for wallet in self .wallets
3535 }
36+ self .is_balance_stream = self .name .endswith ('balance' )
3637
3738 def stream_slices (self , sync_mode : SyncMode , cursor_field : List [str ] = None , stream_state : Mapping [str , Any ] = None ) -> Iterable [Optional [Mapping [str , Any ]]]:
3839
@@ -56,7 +57,8 @@ def stream_slices(self, sync_mode: SyncMode, cursor_field: List[str] = None, str
5657
5758 for wallet in self .wallets :
5859 selected = self .historical_mapping [wallet ["address" ]]
59- self .logger .debug (f"{ self .name } > stream_slice: Fetching data for { wallet ['name' ]} from { selected ['start_date' ]} to { selected ['end_date' ]} " )
60+ msg = f"{ self .name } > stream_slice: Fetching data for { wallet ['name' ]} " + ("" if self .is_balance_stream else f" from { selected ['start_date' ]} to { selected ['end_date' ]} " )
61+ self .logger .info (msg )
6062 time .sleep (self .sleep_seconds )
6163 yield {
6264 "address" : wallet ["address" ],
@@ -78,11 +80,18 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:
7880 match = re .search (r"\((\d+)\s*/" , str (result ))
7981 seconds = int (match .group (1 ))
8082
83+ elif self .is_balance_stream :
84+ seconds = 1
85+
8186 self .logger .info (f"{ self .name } > backoff_time: { seconds } s" )
8287 return seconds
8388
8489 def next_page_token (self , response : requests .Response ):
8590
91+ if self .is_balance_stream :
92+ time .sleep (self .sleep_seconds )
93+ return None
94+
8695 result : Union [list [dict ], str ] = response .json ().get ("result" , [])
8796
8897 wallet_address = self .get_params (response ).get ("address" )
@@ -123,10 +132,14 @@ def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[
123132 "module" : "account" ,
124133 "offset" : self .pagination_offset
125134 }
135+ if self .is_balance_stream :
136+ params .pop ("sort" )
137+ params .pop ("offset" )
138+
126139 if next_page_token :
127140 params ["page" ] = next_page_token ["page" ]
128141
129- self .logger .debug (f"{ self .name } > request_params: { params } " )
142+ self .logger .info (f"{ self .name } > request_params: { params } " )
130143 return params
131144
132145 def to_datetime (self , timestamp : str ) -> datetime .datetime :
@@ -204,7 +217,6 @@ def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_sl
204217
205218 yield point
206219
207-
208220class WalletInternalTransactions (EtherscanStream ):
209221 """
210222 This refers to a transfer of ETH that is carried out through a smart contract as an intermediary.
@@ -261,7 +273,6 @@ def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_sl
261273
262274 yield point
263275
264-
265276class WalletTokenTransactions (EtherscanStream ):
266277 """
267278 Transactions of ERC-20 or ERC-721 tokens are labelled as Token Transfer transactions.
@@ -336,6 +347,43 @@ def parse_response(self, response, *, stream_state: Mapping[str, Any], stream_sl
336347
337348 yield point
338349
350+ class NativeBalance (EtherscanStream ):
351+ """
352+ Native balance (ETH) for the wallets
353+
354+ NOTE: Used for debugging purposes
355+ """
356+ primary_key = None
357+ cursor_field = []
358+
359+ def __init__ (self , api_key : str , wallets : list [dict ], chain_id : str , ** kwargs ):
360+ super ().__init__ (api_key , wallets , chain_id , ** kwargs )
361+
362+ def request_params (self , stream_state : Mapping [str , Any ], stream_slice : Mapping [str , any ] = None , next_page_token : Mapping [str , Any ] = None ) -> MutableMapping [str , Any ]:
363+ params = {
364+ ** super ().request_params (stream_state , stream_slice , next_page_token ),
365+ "action" : "balance" ,
366+ }
367+ return params
368+
369+ def parse_response (self , response , * , stream_state : Mapping [str , Any ], stream_slice : Optional [Mapping [str , Any ]] = None , next_page_token : Optional [Mapping [str , Any ]] = None ):
370+
371+ data : dict = response .json ()
372+ params = self .get_params (response )
373+ wallet_address = params ["address" ]
374+ wallet = self .wallet_info [wallet_address ]
375+ point = {
376+ "timestamp" : datetime .datetime .now (),
377+ "wallet_address" : wallet_address ,
378+ "wallet_name" : wallet ["name" ],
379+ "tags" : wallet ["tags" ],
380+ "token_symbol" : "ETH" ,
381+ "token_decimal" : self .ETHEREUM_DECIMALS ,
382+ "amount" : data ["result" ],
383+ "chain_id" : int (self .chain_id )
384+ }
385+ yield point
386+
339387class SourceEtherscan (AbstractSource ):
340388
341389 url = "https://api.etherscan.io/v2/api"
@@ -379,5 +427,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
379427 WalletTransactions (** params ),
380428 WalletInternalTransactions (** params ),
381429 WalletTokenTransactions (** params ),
430+ NativeBalance (** params )
382431 ]
383432 return streams
0 commit comments