@@ -32,7 +32,7 @@ import Data.Array (sort)
32
32
import Data.Either (Either (..))
33
33
import Data.Functor.Tagged (Tagged , tagged , untagged )
34
34
import Data.Lens ((.~), (^.))
35
- import Data.Maybe (Maybe (..))
35
+ import Data.Maybe (Maybe (..), maybe )
36
36
import Data.Newtype (over )
37
37
import Data.Symbol (class IsSymbol )
38
38
import Data.Traversable (for_ , traverse )
@@ -178,48 +178,45 @@ filterProducer
178
178
=> MapRecordWithIndex fsList (ConstMapping ModifyFilter ) fs fs
179
179
=> MultiFilterStreamState fs
180
180
-> Transducer Void (Record fs ) Web3 (MultiFilterStreamState fs )
181
- filterProducer cs@(MultiFilterStreamState currentState) = do
182
- let -- hang out until the chain makes progress
183
- waitForMoreBlocks = do
184
- lift $ liftAff $ delay (Milliseconds 3000.0 )
185
- filterProducer cs
186
-
187
- -- resume the filter production
188
- continueTo maxEndBlock = do
189
- let
190
- endBlock = newTo maxEndBlock currentState.currentBlock currentState.windowSize
191
-
192
- modify :: forall (k :: Type ) (e :: k ). Filter e -> Filter e
193
- modify fltr =
194
- fltr # _fromBlock .~ BN currentState .currentBlock
195
- # _toBlock
196
- .~ BN endBlock
197
-
198
- fs' = hmap (ModifyFilter modify ) currentState .filters
199
- yieldT fs'
200
- filterProducer $ MultiFilterStreamState currentState { currentBlock = succ endBlock }
181
+ filterProducer cs@(MultiFilterStreamState currentState@{ windowSize, currentBlock, filters: currentFilters }) = do
201
182
chainHead <- lift eth_blockNumber
202
- -- if the chain head is less than the current block we want to process
203
- -- then wait until the chain progresses
204
- if chainHead < currentState .currentBlock then
205
- waitForMoreBlocks
206
- -- otherwise try make progress
207
- else case hfoldlWithIndex MultiFilterMinToBlock Latest currentState .filters of
208
- -- consume as many as possible up to the chain head
209
- Latest -> continueTo $ over BlockNumber (_ - fromInt currentState .trailBy ) chainHead
210
- -- if the original fitler ends at a specific block, consume as many as possible up to that block
211
- -- or terminate if we're already past it
212
- BN targetEnd ->
213
- let
214
- targetEnd' = min targetEnd $ over BlockNumber (_ - fromInt currentState .trailBy ) chainHead
215
- in
216
- if currentState .currentBlock <= targetEnd' then
217
- continueTo targetEnd'
218
- else
219
- pure cs
183
+ let
184
+ { nextEndBlock, finalBlock } = case hfoldlWithIndex MultiFilterMinToBlock Latest currentFilters of
185
+ Latest ->
186
+ { nextEndBlock: over BlockNumber (_ - fromInt currentState.trailBy) chainHead
187
+ , finalBlock: Nothing
188
+ }
189
+ BN targetEnd ->
190
+ let
191
+ nextAvailableBlock = over BlockNumber (_ - fromInt currentState.trailBy) chainHead
192
+ in
193
+ { nextEndBlock: min targetEnd nextAvailableBlock, finalBlock: Just targetEnd }
194
+ isFinished = maybe false (\final -> currentBlock > final) finalBlock
195
+ if isFinished then pure cs
196
+ else if chainHead < currentBlock then waitForMoreBlocks
197
+ else continueTo nextEndBlock
198
+
220
199
where
221
- newTo :: BlockNumber -> BlockNumber -> Int -> BlockNumber
222
- newTo upper current window = min upper $ over BlockNumber (_ + fromInt window ) current
200
+
201
+ waitForMoreBlocks = do
202
+ lift $ liftAff $ delay (Milliseconds 3000.0 )
203
+ filterProducer cs
204
+
205
+ -- resume the filter production
206
+ continueTo maxEndBlock = do
207
+ let
208
+ endBlock = min maxEndBlock $ over BlockNumber (_ + fromInt windowSize) currentBlock
209
+
210
+ modify :: forall (k :: Type ) (e :: k ). Filter e -> Filter e
211
+ modify fltr =
212
+ fltr # _fromBlock .~ BN currentBlock
213
+ # _toBlock .~ BN endBlock
214
+
215
+ fs' = hmap (ModifyFilter modify ) currentFilters
216
+ yieldT fs'
217
+ filterProducer $ MultiFilterStreamState currentState
218
+ { currentBlock = succ endBlock
219
+ }
223
220
224
221
succ :: BlockNumber -> BlockNumber
225
222
succ = over BlockNumber (_ + one )
@@ -456,6 +453,7 @@ stagger
456
453
-> Transducer i o m a
457
454
stagger osT =
458
455
let
459
- trickle = awaitForever \os -> for_ os yieldT
456
+ trickle = awaitForever \os ->
457
+ for_ os yieldT
460
458
in
461
459
fst <$> (osT =>= trickle)
0 commit comments