@@ -204,6 +204,36 @@ def on_work_done(task, status):
204204 workers : Set [asyncio .Task [None ]] = set ()
205205 last_wid = 0
206206
207+ agreements_to_pay : Set [str ] = set ()
208+ invoices : Dict [str , rest .payment .Invoice ] = dict ()
209+ payment_closing : bool = False
210+
211+ async def process_invoices ():
212+ assert self ._budget_allocation
213+ allocation : rest .payment .Allocation = self ._budget_allocation
214+ async for invoice in self ._payment_api .incoming_invoices ():
215+ if invoice .agreement_id in agreements_to_pay :
216+ agreements_to_pay .remove (invoice .agreement_id )
217+ await invoice .accept (amount = invoice .amount , allocation = allocation )
218+ else :
219+ invoices [invoice .agreement_id ] = invoice
220+ if payment_closing and not agreements_to_pay :
221+ break
222+
223+ async def accept_payment_for_agreement (agreement_id : str , * , partial = False ) -> bool :
224+ assert self ._budget_allocation
225+ allocation : rest .payment .Allocation = self ._budget_allocation
226+ emit_progress ("agr" , "payment_prep" , agreement_id )
227+ inv = invoices .get (agreement_id )
228+ if inv is None :
229+ agreements_to_pay .add (agreement_id )
230+ emit_progress ("agr" , "payment_queued" , agreement_id )
231+ return False
232+ del invoices [agreement_id ]
233+ emit_progress ("agr" , "payment_accept" , agreement_id , invoice = inv )
234+ await inv .accept (amount = inv .amount , allocation = allocation )
235+ return True
236+
207237 async def _tmp_log ():
208238 while True :
209239 item = await event_queue .get ()
@@ -244,9 +274,7 @@ async def find_offers():
244274 # "test1",
245275 # auth=aiohttp.BasicAuth("alice", "secret1234"),
246276 # )
247- print ("pre" )
248277 storage_manager = await self ._stack .enter_async_context (gftp .provider ())
249- print ("post" )
250278
251279 async def start_worker (agreement : rest .market .Agreement ):
252280 nonlocal last_wid
@@ -271,7 +299,6 @@ async def task_emiter():
271299 work_context = WorkContext (f"worker-{ wid } " , storage_manager )
272300 async for batch in worker (work_context , task_emiter ()):
273301 await batch .prepare ()
274- print ("prepared" )
275302 cc = CommandContainer ()
276303 batch .register (cc )
277304 remote = await act .send (cc .commands ())
@@ -283,7 +310,9 @@ async def task_emiter():
283310 emit_progress ("wkr" , "get-results" , wid )
284311 await batch .post ()
285312 emit_progress ("wkr" , "bach-done" , wid )
313+ await accept_payment_for_agreement (agreement .id , partial = True )
286314
315+ await accept_payment_for_agreement (agreement .id )
287316 emit_progress ("wkr" , "done" , wid , agreement = agreement .id )
288317
289318 async def worker_starter ():
@@ -324,6 +353,7 @@ async def fill_work_q():
324353
325354 loop = asyncio .get_event_loop ()
326355 find_offers_task = loop .create_task (find_offers ())
356+ process_invoices_job = loop .create_task (process_invoices ())
327357 # Py38: find_offers_task.set_name('find_offers_task')
328358 try :
329359 task_fill_q = loop .create_task (fill_work_q ())
@@ -332,6 +362,7 @@ async def fill_work_q():
332362 loop .create_task (_tmp_log ()),
333363 task_fill_q ,
334364 loop .create_task (worker_starter ()),
365+ process_invoices_job ,
335366 }
336367 while (
337368 task_fill_q in services
@@ -344,20 +375,22 @@ async def fill_work_q():
344375 done , pending = await asyncio .wait (
345376 services .union (workers ), timeout = 10 , return_when = asyncio .FIRST_COMPLETED
346377 )
347- # print('done=', done)
348378 workers -= done
349379 services -= done
350- print ( " all work done")
380+ yield { "stage" : " all work done"}
351381 except Exception as e :
352382 print ("fail=" , e )
353383 finally :
384+ payment_closing = True
354385 for worker_task in workers :
355386 worker_task .cancel ()
356387 find_offers_task .cancel ()
357388 await asyncio .wait (
358389 workers .union ({find_offers_task }), timeout = 5 , return_when = asyncio .ALL_COMPLETED
359390 )
360- yield {}
391+ yield {"stage" : "wait for invoices" , "agreements_to_pay" : agreements_to_pay }
392+ payment_closing = True
393+ await asyncio .wait ({process_invoices_job }, timeout = 15 , return_when = asyncio .ALL_COMPLETED )
361394
362395 yield {"done" : True }
363396 pass
@@ -371,7 +404,6 @@ async def __aenter__(self):
371404 self ._market_api = rest .Market (market_client )
372405
373406 activity_client = await stack .enter_async_context (self ._api_config .activity ())
374- print (f"act_url={ self ._api_config .activity_url } " )
375407 self ._activity_api = rest .Activity (activity_client )
376408
377409 payment_client = await stack .enter_async_context (self ._api_config .payment ())
0 commit comments