@@ -32,7 +32,7 @@ import Data.Array (sort)
3232import Data.Either (Either (..))
3333import Data.Functor.Tagged (Tagged , tagged , untagged )
3434import Data.Lens ((.~), (^.))
35- import Data.Maybe (Maybe (..))
35+ import Data.Maybe (Maybe (..), maybe )
3636import Data.Newtype (over )
3737import Data.Symbol (class IsSymbol )
3838import Data.Traversable (for_ , traverse )
@@ -178,48 +178,45 @@ filterProducer
178178 => MapRecordWithIndex fsList (ConstMapping ModifyFilter ) fs fs
179179 => MultiFilterStreamState fs
180180 -> Transducer Void (Record fs ) Web3 (MultiFilterStreamState fs )
181- filterProducer cs@(MultiFilterStreamState currentState) = do
182- let -- hang out until the chain makes progress
183- waitForMoreBlocks = do
184- lift $ liftAff $ delay (Milliseconds 3000.0 )
185- filterProducer cs
186-
187- -- resume the filter production
188- continueTo maxEndBlock = do
189- let
190- endBlock = newTo maxEndBlock currentState.currentBlock currentState.windowSize
191-
192- modify :: forall (k :: Type ) (e :: k ). Filter e -> Filter e
193- modify fltr =
194- fltr # _fromBlock .~ BN currentState .currentBlock
195- # _toBlock
196- .~ BN endBlock
197-
198- fs' = hmap (ModifyFilter modify ) currentState .filters
199- yieldT fs'
200- filterProducer $ MultiFilterStreamState currentState { currentBlock = succ endBlock }
181+ filterProducer cs@(MultiFilterStreamState currentState@{ windowSize, currentBlock, filters: currentFilters }) = do
201182 chainHead <- lift eth_blockNumber
202- -- if the chain head is less than the current block we want to process
203- -- then wait until the chain progresses
204- if chainHead < currentState .currentBlock then
205- waitForMoreBlocks
206- -- otherwise try make progress
207- else case hfoldlWithIndex MultiFilterMinToBlock Latest currentState .filters of
208- -- consume as many as possible up to the chain head
209- Latest -> continueTo $ over BlockNumber (_ - fromInt currentState .trailBy ) chainHead
210- -- if the original fitler ends at a specific block, consume as many as possible up to that block
211- -- or terminate if we're already past it
212- BN targetEnd ->
213- let
214- targetEnd' = min targetEnd $ over BlockNumber (_ - fromInt currentState .trailBy ) chainHead
215- in
216- if currentState .currentBlock <= targetEnd' then
217- continueTo targetEnd'
218- else
219- pure cs
183+ let
184+ { nextEndBlock, finalBlock } = case hfoldlWithIndex MultiFilterMinToBlock Latest currentFilters of
185+ Latest ->
186+ { nextEndBlock: over BlockNumber (_ - fromInt currentState.trailBy) chainHead
187+ , finalBlock: Nothing
188+ }
189+ BN targetEnd ->
190+ let
191+ nextAvailableBlock = over BlockNumber (_ - fromInt currentState.trailBy) chainHead
192+ in
193+ { nextEndBlock: min targetEnd nextAvailableBlock, finalBlock: Just targetEnd }
194+ isFinished = maybe false (\final -> currentBlock > final) finalBlock
195+ if isFinished then pure cs
196+ else if chainHead < currentBlock then waitForMoreBlocks
197+ else continueTo nextEndBlock
198+
220199 where
221- newTo :: BlockNumber -> BlockNumber -> Int -> BlockNumber
222- newTo upper current window = min upper $ over BlockNumber (_ + fromInt window ) current
200+
201+ waitForMoreBlocks = do
202+ lift $ liftAff $ delay (Milliseconds 3000.0 )
203+ filterProducer cs
204+
205+ -- resume the filter production
206+ continueTo maxEndBlock = do
207+ let
208+ endBlock = min maxEndBlock $ over BlockNumber (_ + fromInt windowSize) currentBlock
209+
210+ modify :: forall (k :: Type ) (e :: k ). Filter e -> Filter e
211+ modify fltr =
212+ fltr # _fromBlock .~ BN currentBlock
213+ # _toBlock .~ BN endBlock
214+
215+ fs' = hmap (ModifyFilter modify ) currentFilters
216+ yieldT fs'
217+ filterProducer $ MultiFilterStreamState currentState
218+ { currentBlock = succ endBlock
219+ }
223220
224221 succ :: BlockNumber -> BlockNumber
225222 succ = over BlockNumber (_ + one )
@@ -456,6 +453,7 @@ stagger
456453 -> Transducer i o m a
457454stagger osT =
458455 let
459- trickle = awaitForever \os -> for_ os yieldT
456+ trickle = awaitForever \os ->
457+ for_ os yieldT
460458 in
461459 fst <$> (osT =>= trickle)
0 commit comments