Skip to content

Commit ae882ac

Browse files
Covering sync and async batching with local files
1 parent 78a0e49 commit ae882ac

File tree

14 files changed

+320
-14
lines changed

14 files changed

+320
-14
lines changed

config/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
imports:
2+
- { resource: "./config/messenger.yaml" }
23
- { resource: "./config/sylius_grid.yaml" }
34
- { resource: "./config/sylius_resource.yaml" }
45
- { resource: "./config/twig_hooks.yaml" }

config/config/messenger.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
parameters:
2+
env(SYLIUS_IMPORT_EXPORT_MESSENGER_TRANSPORT_EXPORT_COMMAND_DSN): 'sync://'
3+
env(SYLIUS_IMPORT_EXPORT_MESSENGER_TRANSPORT_EXPORT_EVENT_DSN): 'sync://'
4+
5+
sylius_import_export_messenger_transport_export_command_dsn: '%env(resolve:SYLIUS_IMPORT_EXPORT_MESSENGER_TRANSPORT_EXPORT_COMMAND_DSN)%'
6+
sylius_import_export_messenger_transport_export_event_dsn: '%env(resolve:SYLIUS_IMPORT_EXPORT_MESSENGER_TRANSPORT_EXPORT_EVENT_DSN)%'
7+
8+
framework:
9+
messenger:
10+
transports:
11+
export_command:
12+
dsn: '%sylius_import_export_messenger_transport_export_command_dsn%'
13+
failure_transport: 'export_event'
14+
export_event:
15+
dsn: '%sylius_import_export_messenger_transport_export_event_dsn%'
16+
routing:
17+
'Sylius\GridImportExport\Messenger\Command\ExportCommand': export_command
18+
'Sylius\GridImportExport\Messenger\Event\ExportProcessCompleted': export_event
19+
buses:
20+
sylius_import_export.export.command_bus:
21+
middleware:
22+
- doctrine_transaction
23+
- sylius_import_export.messenger.middleware.export_batch_tracking
24+
sylius_import_export.export.event_bus: ~

config/doctrine/ExportProcess.orm.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
<field name="format" column="format" type="string" />
1818
<field name="parameters" column="parameters" type="json" />
1919
<field name="resourceIds" column="resource_ids" type="json" nullable="true" />
20+
<field name="temporaryDirectory" column="temporary_directory" type="string" nullable="true" />
21+
<field name="batchesCount" column="batches_count" type="integer" />
2022
<field name="createdAt" column="created_at" type="datetime">
2123
<gedmo:timestampable on="create" />
2224
</field>

config/services.xml

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111

1212
<services>
1313
<service id="sylius_import_export.messenger.command_handler.create_export_process" class="Sylius\GridImportExport\Messenger\Handler\CreateExportProcessHandler">
14-
<argument type="service" id="sylius_import_export.custom_factory.process" />
15-
<argument type="service" id="sylius_grid_import_export.repository.process" />
16-
<argument type="service" id="sylius.command_bus" />
14+
<argument type="service" id="sylius_import_export.factory.process" />
15+
<argument type="service" id="sylius_grid_import_export.repository.process_export" />
16+
<argument type="service" id="sylius_import_export.export.command_bus" />
17+
<argument>%sylius_grid_import_export.export_files_directory%</argument>
1718

1819
<tag name="messenger.message_handler" bus="sylius.command_bus" />
1920
</service>
@@ -24,7 +25,21 @@
2425
<argument type="service" id="sylius_import_export.registry.resource_data_provider" />
2526
<argument type="service" id="sylius_grid_import_export.exporter_resolver" />
2627

27-
<tag name="messenger.message_handler" bus="sylius.command_bus" />
28+
<tag name="messenger.message_handler" bus="sylius_import_export.export.command_bus" />
29+
</service>
30+
31+
<service id="sylius_import_export.messenger.event_handler.export_completed" class="Sylius\GridImportExport\Messenger\Handler\ExportCompletedHandler">
32+
<argument type="service" id="sylius_grid_import_export.repository.process_export" />
33+
<argument type="service" id="sylius_grid_import_export.exporter_resolver" />
34+
35+
<tag name="messenger.message_handler" bus="sylius_import_export.export.event_bus" />
36+
</service>
37+
38+
<service id="sylius_import_export.messenger.middleware.export_batch_tracking" class="Sylius\GridImportExport\Messenger\Middleware\ExportBatchTrackingMiddleware">
39+
<argument type="service" id="sylius_grid_import_export.repository.process_export" />
40+
<argument type="service" id="sylius_import_export.export.event_bus" />
41+
42+
<tag name="messenger.middleware" />
2843
</service>
2944

3045
<service id="sylius_import_export.twig.component.export_resource" class="Sylius\GridImportExport\Twig\Component\ExportResourceFormComponent">

src/Entity/ExportProcess.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ class ExportProcess extends Process implements ExportProcessInterface
2323

2424
protected array $resourceIds = [];
2525

26+
protected int $batchesCount = 0;
27+
28+
protected ?string $temporaryDirectory = null;
29+
2630
public function getType(): string
2731
{
2832
return ExportProcessInterface::TYPE;
@@ -67,4 +71,24 @@ public function setResourceIds(array $resourceIds): void
6771
{
6872
$this->resourceIds = $resourceIds;
6973
}
74+
75+
public function getBatchesCount(): int
76+
{
77+
return $this->batchesCount;
78+
}
79+
80+
public function setBatchesCount(int $count): void
81+
{
82+
$this->batchesCount = $count;
83+
}
84+
85+
public function getTemporaryDirectory(): ?string
86+
{
87+
return $this->temporaryDirectory;
88+
}
89+
90+
public function setTemporaryDirectory(?string $directory): void
91+
{
92+
$this->temporaryDirectory = $directory;
93+
}
7094
}

src/Entity/ExportProcessInterface.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
<?php
22

3+
/*
4+
* This file is part of the Sylius package.
5+
*
6+
* (c) Sylius Sp. z o.o.
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
312
declare(strict_types=1);
413

514
namespace Sylius\GridImportExport\Entity;
@@ -24,4 +33,11 @@ public function getParameters(): array;
2433

2534
public function setParameters(array $parameters): void;
2635

36+
public function getBatchesCount(): int;
37+
38+
public function setBatchesCount(int $count): void;
39+
40+
public function getTemporaryDirectory(): ?string;
41+
42+
public function setTemporaryDirectory(?string $directory): void;
2743
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Sylius package.
5+
*
6+
* (c) Sylius Sp. z o.o.
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Sylius\GridImportExport\Messenger\Event;
15+
16+
class ExportProcessCompleted
17+
{
18+
public function __construct(
19+
public string $processId,
20+
) {
21+
}
22+
}

src/Messenger/Handler/CreateExportProcessHandler.php

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,37 +14,61 @@
1414
namespace Sylius\GridImportExport\Messenger\Handler;
1515

1616
use Sylius\GridImportExport\Entity\ProcessInterface;
17+
use Sylius\GridImportExport\Exporter\CsvExporter;
18+
use Sylius\GridImportExport\Exporter\JsonExporter;
1719
use Sylius\GridImportExport\Factory\ProcessFactoryInterface;
1820
use Sylius\GridImportExport\Messenger\Command\CreateExportProcess;
1921
use Sylius\GridImportExport\Messenger\Command\ExportCommand;
22+
use Sylius\GridImportExport\Messenger\Stamp\ExportBatchCounterStamp;
2023
use Sylius\Resource\Doctrine\Persistence\RepositoryInterface;
2124
use Symfony\Component\Messenger\MessageBusInterface;
2225

2326
class CreateExportProcessHandler
2427
{
28+
private const FILE_BASED_FORMATS = [
29+
CsvExporter::FORMAT,
30+
JsonExporter::FORMAT,
31+
];
32+
33+
protected string $temporaryExportDirectory;
34+
2535
/**
2636
* @param RepositoryInterface<ProcessInterface> $processRepository
2737
* @param int<1, max> $batchSize
2838
*/
2939
public function __construct(
30-
public ProcessFactoryInterface $processFactory,
31-
public RepositoryInterface $processRepository,
32-
public MessageBusInterface $messageBus,
33-
public int $batchSize = 100,
40+
protected ProcessFactoryInterface $processFactory,
41+
protected RepositoryInterface $processRepository,
42+
protected MessageBusInterface $messageBus,
43+
string $exportDirectory,
44+
protected int $batchSize = 100,
3445
) {
46+
$this->temporaryExportDirectory = sprintf('%s/ongoing/', $exportDirectory);
3547
}
3648

3749
public function __invoke(CreateExportProcess $command): void
3850
{
3951
$process = $this->processFactory->createExportProcess($command);
4052

53+
if (in_array($command->format, self::FILE_BASED_FORMATS)) {
54+
$ongoingProcessDirectory = $this->temporaryExportDirectory . '/' . $process->getUuid();
55+
$process->setTemporaryDirectory($ongoingProcessDirectory);
56+
57+
if (!is_dir($ongoingProcessDirectory)) {
58+
mkdir($ongoingProcessDirectory, recursive: true);
59+
}
60+
}
61+
62+
$batchesCount = (int) ceil(count($process->getResourceIds()) / $this->batchSize);
63+
$process->setBatchesCount($batchesCount);
64+
4165
$this->processRepository->add($process);
4266

4367
foreach (array_chunk($process->getResourceIds(), $this->batchSize) as $batch) {
4468
$this->messageBus->dispatch(new ExportCommand(
4569
processId: $process->getUuid(),
4670
resourceIds: $batch,
47-
));
71+
), [new ExportBatchCounterStamp()]);
4872
}
4973
}
5074
}

src/Messenger/Handler/ExportCommandHandler.php

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
namespace Sylius\GridImportExport\Messenger\Handler;
1515

1616
use Sylius\GridImportExport\Entity\ExportProcessInterface;
17+
use Sylius\GridImportExport\Exception\ExportFailedException;
1718
use Sylius\GridImportExport\Messenger\Command\ExportCommand;
1819
use Sylius\GridImportExport\Provider\Registry\ResourceDataProviderRegistryInterface;
1920
use Sylius\GridImportExport\Resolver\ExporterResolverInterface;
@@ -24,18 +25,18 @@ class ExportCommandHandler
2425
{
2526
/** @param RepositoryInterface<ExportProcessInterface> $processRepository */
2627
public function __construct(
27-
public RegistryInterface $metadataRegistry,
28-
public RepositoryInterface $processRepository,
29-
public ResourceDataProviderRegistryInterface $dataProviderRegistry,
30-
public ExporterResolverInterface $exporterResolver,
28+
protected RegistryInterface $metadataRegistry,
29+
protected RepositoryInterface $processRepository,
30+
protected ResourceDataProviderRegistryInterface $dataProviderRegistry,
31+
protected ExporterResolverInterface $exporterResolver,
3132
) {
3233
}
3334

3435
public function __invoke(ExportCommand $command): void
3536
{
3637
$process = $this->processRepository->find($command->processId);
3738
if (null === $process) {
38-
return;
39+
throw new ExportFailedException(sprintf('Process with uuid "%s" not found.', $command->processId));
3940
}
4041

4142
$resourceMetadata = $this->metadataRegistry->get($process->getResource());
@@ -45,6 +46,18 @@ public function __invoke(ExportCommand $command): void
4546
->getData($resourceMetadata, $process->getGrid(), $command->resourceIds, $process->getParameters())
4647
;
4748

49+
$process->setBatchesCount($process->getBatchesCount() - 1);
50+
51+
$jsonifiedData = json_encode($data, \JSON_THROW_ON_ERROR);
52+
if (null !== $process->getTemporaryDirectory()) {
53+
file_put_contents(
54+
$process->getTemporaryDirectory() . '/' . uniqid() . '.json',
55+
$jsonifiedData,
56+
);
57+
58+
return;
59+
}
60+
4861
try {
4962
$resolver = $this->exporterResolver->resolve($process->getFormat());
5063
$outputPath = $resolver->export($data);
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Sylius package.
5+
*
6+
* (c) Sylius Sp. z o.o.
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Sylius\GridImportExport\Messenger\Handler;
15+
16+
use Sylius\GridImportExport\Entity\ExportProcessInterface;
17+
use Sylius\GridImportExport\Exception\ExportFailedException;
18+
use Sylius\GridImportExport\Messenger\Event\ExportProcessCompleted;
19+
use Sylius\GridImportExport\Resolver\ExporterResolverInterface;
20+
use Sylius\Resource\Doctrine\Persistence\RepositoryInterface;
21+
22+
class ExportCompletedHandler
23+
{
24+
/** @param RepositoryInterface<ExportProcessInterface> $processRepository */
25+
public function __construct(
26+
private RepositoryInterface $processRepository,
27+
private ExporterResolverInterface $exporterResolver,
28+
) {
29+
}
30+
31+
public function __invoke(ExportProcessCompleted $event): void
32+
{
33+
$process = $this->processRepository->find($event->processId);
34+
if (null === $process) {
35+
throw new ExportFailedException(sprintf('Process with uuid "%s" not found.', $event->processId));
36+
}
37+
38+
$temporaryDirectory = (string) $process->getTemporaryDirectory();
39+
$files = scandir($temporaryDirectory);
40+
41+
$foundFiles = array_filter($files ?: [], static function ($file) {
42+
return str_ends_with($file, '.json');
43+
});
44+
$data = [];
45+
foreach ($foundFiles as $file) {
46+
$filePath = $temporaryDirectory . '/' . $file;
47+
if (is_file($filePath)) {
48+
$data[] = json_decode((string) file_get_contents($filePath), true);
49+
}
50+
}
51+
if ([] === $data) {
52+
throw new ExportFailedException(sprintf('No data found for process with uuid "%s".', $event->processId));
53+
}
54+
55+
try {
56+
$resolver = $this->exporterResolver->resolve($process->getFormat());
57+
$outputPath = $resolver->export($data);
58+
59+
$process->setStatus('success');
60+
$process->setOutput($outputPath);
61+
} catch (\Throwable $e) {
62+
$process->setStatus('failed');
63+
$process->setErrorMessage($e->getMessage());
64+
}
65+
66+
$process->setBatchesCount(0);
67+
$process->setTemporaryDirectory(null);
68+
69+
$this->processRepository->add($process);
70+
71+
if (is_dir($temporaryDirectory)) {
72+
foreach ($foundFiles as $file) {
73+
unlink($temporaryDirectory . '/' . $file);
74+
}
75+
76+
rmdir($temporaryDirectory);
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)