77use Oliverde8 \PhpEtlBundle \Exception \UnknownChainException ;
88use Oliverde8 \PhpEtlBundle \Factory \ChainFactory ;
99use Oliverde8 \PhpEtlBundle \Repository \EtlExecutionRepository ;
10+ use function Clue \StreamFilter \fun ;
1011
1112class ChainProcessorsManager
1213{
@@ -124,7 +125,18 @@ public function executeFromEtlEntity(EtlExecution $execution, iterable $iterator
124125 ];
125126
126127 // Start the process.
127- $ processor ->process ($ iterator , $ params , $ observerCallback );
128+ $ observerProcessTime = 0 ;
129+ $ processor ->process ($ iterator , $ params , function (array $ operationStates , int $ processedItems , int $ returnedItems , bool $ hasFinished = false ) use ($ observerCallback , &$ observerProcessTime , $ execution ) {
130+ $ observerCallback ($ operationStates , $ processedItems , $ returnedItems , $ hasFinished );
131+
132+ if ((time () - $ observerProcessTime ) > 5 ) {
133+ $ execution = $ this ->etlExecutionRepository ->find ($ execution ->getId ());
134+ $ execution ->setStepStats (serialize ($ operationStates ));
135+ $ this ->etlExecutionRepository ->save ($ execution );
136+
137+ $ observerProcessTime = time ();
138+ }
139+ });
128140 $ execution = $ this ->etlExecutionRepository ->find ($ execution ->getId ());
129141 $ execution ->setStatus (EtlExecution::STATUS_SUCCESS );
130142 } catch (\Throwable $ exception ) {
@@ -136,7 +148,6 @@ public function executeFromEtlEntity(EtlExecution $execution, iterable $iterator
136148 } finally {
137149 $ execution ->setEndTime (new \DateTime ());
138150 $ execution ->setRunTime (time () - $ execution ->getStartTime ()->getTimestamp ());
139- $ execution ->setStepStats ('[] ' ); // To be developped
140151 $ this ->etlExecutionRepository ->save ($ execution );
141152 }
142153 }
0 commit comments