@@ -2,7 +2,7 @@ import { _, imock, instance, mock, reset, spy, verify, when } from "@johanblumen
22import { Logger , waitAndCall , waitFor , YagnaApi } from "../shared/utils" ;
33import { MarketModuleImpl } from "./market.module" ;
44import { Demand , DemandSpecification } from "./demand" ;
5- import { Subject , take } from "rxjs" ;
5+ import { Observable , Subject , take } from "rxjs" ;
66import { MarketProposalEvent , OfferProposal , ProposalProperties } from "./proposal" ;
77import { MarketApiAdapter } from "../shared/yagna/" ;
88import { IActivityApi , IFileServer } from "../activity" ;
@@ -428,8 +428,82 @@ describe("Market module", () => {
428428
429429 verify ( mockMarketApiAdapter . counterProposal ( initialProposal , spec ) ) . once ( ) ;
430430 } ) ;
431- } ) ;
431+ it ( "should retry collecting proposals after an error" , async ( ) => {
432+ jest . useRealTimers ( ) ;
433+
434+ const spec = new DemandSpecification (
435+ {
436+ properties : [ ] ,
437+ constraints : [ ] ,
438+ } ,
439+ "erc20-holesky-tglm" ,
440+ ) ;
441+
442+ const mockProposal = mock ( OfferProposal ) ;
443+ when ( mockProposal . isValid ( ) ) . thenReturn ( true ) ;
444+ when ( mockProposal . isDraft ( ) ) . thenReturn ( true ) ;
445+ when ( mockProposal . isInitial ( ) ) . thenReturn ( false ) ;
446+ when ( mockProposal . provider ) . thenReturn ( {
447+ id : "test-provider-id" ,
448+ name : "test-provider-name" ,
449+ walletAddress : "0xTestWallet" ,
450+ } ) ;
451+ when ( mockProposal . properties ) . thenReturn ( initialOfferProperties ) ;
452+ when ( mockProposal . pricing ) . thenReturn ( {
453+ cpuSec : 0.4 / 3600 ,
454+ envSec : 0.4 / 3600 ,
455+ start : 0.4 ,
456+ } ) ;
457+ const validProposal = instance ( mockProposal ) ;
458+
459+ // Fail for 10 times in a row and then succeed
460+ let attempts = 0 ;
461+ const demandOfferEvent$ = new Observable < MarketProposalEvent > ( ( subscriber ) => {
462+ if ( attempts < 10 ) {
463+ attempts ++ ;
464+ subscriber . error ( new Error ( "Temporary error" ) ) ;
465+ } else {
466+ subscriber . next ( {
467+ type : "ProposalReceived" ,
468+ proposal : validProposal ,
469+ timestamp : new Date ( ) ,
470+ } ) ;
471+ subscriber . complete ( ) ;
472+ }
473+ } ) ;
474+
475+ when ( mockMarketApiAdapter . collectMarketProposalEvents ( _ ) ) . thenReturn ( demandOfferEvent$ ) ;
432476
477+ const draftProposal$ = marketModule . collectDraftOfferProposals ( {
478+ demandSpecification : spec ,
479+ proposalsBatchReleaseTimeoutMs : 1 ,
480+ minProposalsBatchSize : 1 ,
481+ pricing : {
482+ model : "linear" ,
483+ maxStartPrice : 0.5 ,
484+ maxCpuPerHourPrice : 1.0 ,
485+ maxEnvPerHourPrice : 0.5 ,
486+ } ,
487+ retryConfig : {
488+ delay : 10 ,
489+ } ,
490+ } ) ;
491+
492+ const draftListener = jest . fn ( ) ;
493+ const errorListener = jest . fn ( ) ;
494+
495+ const testSub = draftProposal$ . subscribe ( {
496+ next : draftListener ,
497+ error : errorListener ,
498+ } ) ;
499+
500+ await waitFor ( ( ) => draftListener . mock . calls . length > 0 ) ;
501+ testSub . unsubscribe ( ) ;
502+
503+ expect ( draftListener ) . toHaveBeenCalledWith ( validProposal ) ;
504+ expect ( errorListener ) . not . toHaveBeenCalled ( ) ;
505+ } ) ;
506+ } ) ;
433507 describe ( "signAgreementFromPool()" , ( ) => {
434508 beforeEach ( ( ) => {
435509 jest . useRealTimers ( ) ;
@@ -507,7 +581,7 @@ describe("Market module", () => {
507581
508582 describe ( "emitted events" , ( ) => {
509583 describe ( "agreement related events" , ( ) => {
510- test ( "Emits 'agreementConfirmed '" , ( ) => {
584+ test ( "Emits 'agreementApproved '" , ( ) => {
511585 // Given
512586 const agreement = instance ( mockAgreement ) ;
513587
0 commit comments