33namespace Oliverde8 \PhpEtlBundle \Services ;
44
55use Oliverde8 \Component \PhpEtl \ChainProcessor ;
6- use Oliverde8 \Component \PhpEtl \Exception \ChainOperationException ;
7- use Oliverde8 \Component \PhpEtl \Item \DataItem ;
8- use Oliverde8 \Component \PhpEtl \Item \DataItemInterface ;
96use Oliverde8 \PhpEtlBundle \Entity \EtlExecution ;
107use Oliverde8 \PhpEtlBundle \Exception \UnknownChainException ;
8+ use Oliverde8 \PhpEtlBundle \Factory \ChainFactory ;
119use Oliverde8 \PhpEtlBundle \Repository \EtlExecutionRepository ;
12- use Psr \Container \ContainerInterface ;
13- use Psr \Log \LoggerInterface ;
1410
1511class ChainProcessorsManager
1612{
17- protected ContainerInterface $ container ;
18-
1913 protected EtlExecutionRepository $ etlExecutionRepository ;
20-
2114 protected LoggerFactory $ loggerFactory ;
22-
15+ protected ChainFactory $ chainFactory ;
2316 protected array $ definitions ;
17+ protected array $ rewDefinitions ;
2418
2519 public function __construct (
26- ContainerInterface $ container ,
2720 EtlExecutionRepository $ etlExecutionRepository ,
2821 LoggerFactory $ loggerFactory ,
29- array $ definitions
22+ ChainFactory $ chainFactory ,
23+ array $ definitions ,
24+ array $ rawDefinitions
3025 ) {
31- $ this ->container = $ container ;
3226 $ this ->etlExecutionRepository = $ etlExecutionRepository ;
3327 $ this ->loggerFactory = $ loggerFactory ;
28+ $ this ->chainFactory = $ chainFactory ;
3429 $ this ->definitions = $ definitions ;
30+ $ this ->rewDefinitions = $ rawDefinitions ;
3531 }
3632
3733 /**
3834 * @throws UnknownChainException
3935 */
40- public function getDefinition (string $ chainName ): string
36+ public function getRawDefinition (string $ chainName ): string
4137 {
42- if (!isset ($ this ->definitions [$ chainName ])) {
38+ if (!isset ($ this ->rewDefinitions [$ chainName ])) {
4339 $ alternatives = [];
44- foreach (array_keys ($ this ->definitions ) as $ knownId ) {
40+ foreach (array_keys ($ this ->rewDefinitions ) as $ knownId ) {
4541 $ lev = levenshtein ($ chainName , $ knownId );
4642 if ($ lev <= \strlen ($ chainName ) / 3 || str_contains ($ knownId , $ chainName )) {
4743 $ alternatives [] = $ knownId ;
@@ -51,18 +47,25 @@ public function getDefinition(string $chainName): string
5147 throw new UnknownChainException ("Unknown chain ' $ chainName', did you mean: " . implode (", " , $ alternatives ));
5248 }
5349
54- return $ this ->definitions [$ chainName ];
50+ return $ this ->rewDefinitions [$ chainName ];
5551 }
5652
57- public function getDefinitions (): array
53+ public function getRewDefinitions (): array
5854 {
59- return $ this ->definitions ;
55+ return $ this ->rewDefinitions ;
6056 }
6157
62- public function getProcessor (string $ chainName ): ChainProcessor
58+ public function getProcessor (string $ chainName, array $ options ): ChainProcessor
6359 {
64- // TODO Think about either creating the processor & runtime or injecting them into the constructor like the definitions.
65- return $ this ->container ->get ("oliverde8.etl.chain. $ chainName " );
60+ $ this ->getRawDefinition ($ chainName );
61+ $ definition = $ this ->definitions [$ chainName ];
62+ $ chain = $ definition ['chain ' ];
63+ $ maxAsynchronousItems = $ definition ['maxAsynchronousItems ' ] ?? 20 ;
64+ $ defaultOptions = $ definition ['defaultOptions ' ] ?? [];
65+
66+ $ options = array_merge ($ defaultOptions , $ options );
67+
68+ return $ this ->chainFactory ->create ($ chain , $ options , $ maxAsynchronousItems );
6669 }
6770
6871 /**
@@ -76,7 +79,7 @@ public function getProcessor(string $chainName): ChainProcessor
7679 */
7780 public function execute (string $ chainName , iterable $ iterator , array $ params )
7881 {
79- $ definition = $ this ->getDefinition ($ chainName );
82+ $ definition = $ this ->getRawDefinition ($ chainName );
8083
8184 $ inputData = ["Iterator! Can't show input data " ];
8285 if (is_array ($ iterator )) {
@@ -95,7 +98,7 @@ public function execute(string $chainName, iterable $iterator, array $params)
9598 * Execute a chain from it's entity.
9699 *
97100 */
98- public function executeFromEtlEntity (EtlExecution $ execution , iterable $ iterator = null )
101+ public function executeFromEtlEntity (EtlExecution $ execution , iterable $ iterator = null ): void
99102 {
100103 $ chainName = $ execution ->getName ();
101104 $ logger = $ this ->loggerFactory ->get ($ execution );
@@ -108,8 +111,8 @@ public function executeFromEtlEntity(EtlExecution $execution, iterable $iterator
108111 $ this ->etlExecutionRepository ->save ($ execution );
109112
110113 // Build the processor.
111- $ processor = $ this ->getProcessor ($ chainName );
112114 $ params = json_decode ($ execution ->getInputOptions (), true );
115+ $ processor = $ this ->getProcessor ($ chainName , $ params );
113116
114117 if (is_null ($ iterator )) {
115118 $ iterator = new \ArrayIterator (json_decode ($ execution ->getInputData (), true ));
@@ -128,7 +131,7 @@ public function executeFromEtlEntity(EtlExecution $execution, iterable $iterator
128131 $ execution = $ this ->etlExecutionRepository ->find ($ execution ->getId ());
129132 $ execution ->setFailTime (new \DateTime ());
130133 $ execution ->setStatus (EtlExecution::STATUS_FAILURE );
131- $ execution ->setErrorMessage ($ this ->getFullExeptionTrace ($ exception ));
134+ $ execution ->setErrorMessage ($ this ->getFullExceptionTrace ($ exception ));
132135 throw $ exception ;
133136 } finally {
134137 $ execution ->setEndTime (new \DateTime ());
@@ -138,7 +141,7 @@ public function executeFromEtlEntity(EtlExecution $execution, iterable $iterator
138141 }
139142 }
140143
141- protected function getFullExeptionTrace (\Throwable $ exception )
144+ protected function getFullExceptionTrace (\Throwable $ exception ): string
142145 {
143146 $ message = '' ;
144147 do {
0 commit comments