Skip to content

Commit 2652e52

Browse files
Covering sync and async batching with local files
1 parent 78a0e49 commit 2652e52

File tree

16 files changed

+450
-14
lines changed

16 files changed

+450
-14
lines changed

config/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
imports:
2+
- { resource: "./config/messenger.yaml" }
23
- { resource: "./config/sylius_grid.yaml" }
34
- { resource: "./config/sylius_resource.yaml" }
45
- { resource: "./config/twig_hooks.yaml" }

config/config/messenger.yaml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
parameters:
2+
env(SYLIUS_IMPORT_EXPORT_MESSENGER_TRANSPORT_EXPORT_COMMAND_DSN): 'sync://'
3+
env(SYLIUS_IMPORT_EXPORT_MESSENGER_TRANSPORT_EXPORT_EVENT_DSN): 'sync://'
4+
5+
sylius_import_export_messenger_transport_export_command_dsn: '%env(resolve:SYLIUS_IMPORT_EXPORT_MESSENGER_TRANSPORT_EXPORT_COMMAND_DSN)%'
6+
sylius_import_export_messenger_transport_export_event_dsn: '%env(resolve:SYLIUS_IMPORT_EXPORT_MESSENGER_TRANSPORT_EXPORT_EVENT_DSN)%'
7+
8+
framework:
9+
messenger:
10+
transports:
11+
export_command:
12+
dsn: '%sylius_import_export_messenger_transport_export_command_dsn%'
13+
failure_transport: 'export_event'
14+
export_event:
15+
dsn: '%sylius_import_export_messenger_transport_export_event_dsn%'
16+
routing:
17+
'Sylius\GridImportExport\Messenger\Command\ExportCommand': export_command
18+
'Sylius\GridImportExport\Messenger\Event\ExportProcessCompleted': export_event
19+
buses:
20+
sylius_import_export.export.command_bus:
21+
middleware:
22+
- doctrine_transaction
23+
- sylius_import_export.messenger.middleware.export_batch_tracking
24+
sylius_import_export.export.event_bus: ~

config/doctrine/ExportProcess.orm.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
<field name="format" column="format" type="string" />
1818
<field name="parameters" column="parameters" type="json" />
1919
<field name="resourceIds" column="resource_ids" type="json" nullable="true" />
20+
<field name="temporaryDataStorage" column="temporary_data_storage" type="string" nullable="true" />
21+
<field name="batchesCount" column="batches_count" type="integer" />
2022
<field name="createdAt" column="created_at" type="datetime">
2123
<gedmo:timestampable on="create" />
2224
</field>

config/services.xml

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111

1212
<services>
1313
<service id="sylius_import_export.messenger.command_handler.create_export_process" class="Sylius\GridImportExport\Messenger\Handler\CreateExportProcessHandler">
14-
<argument type="service" id="sylius_import_export.custom_factory.process" />
15-
<argument type="service" id="sylius_grid_import_export.repository.process" />
16-
<argument type="service" id="sylius.command_bus" />
14+
<argument type="service" id="sylius_import_export.factory.process" />
15+
<argument type="service" id="sylius_grid_import_export.repository.process_export" />
16+
<argument type="service" id="sylius_import_export.export.command_bus" />
17+
<argument type="service" id="sylius_import_export.export.batched_data_manager" />
1718

1819
<tag name="messenger.message_handler" bus="sylius.command_bus" />
1920
</service>
@@ -23,8 +24,24 @@
2324
<argument type="service" id="sylius_grid_import_export.repository.process_export" />
2425
<argument type="service" id="sylius_import_export.registry.resource_data_provider" />
2526
<argument type="service" id="sylius_grid_import_export.exporter_resolver" />
27+
<argument type="service" id="sylius_import_export.export.batched_data_manager" />
2628

27-
<tag name="messenger.message_handler" bus="sylius.command_bus" />
29+
<tag name="messenger.message_handler" bus="sylius_import_export.export.command_bus" />
30+
</service>
31+
32+
<service id="sylius_import_export.messenger.event_handler.export_completed" class="Sylius\GridImportExport\Messenger\Handler\ExportCompletedHandler">
33+
<argument type="service" id="sylius_grid_import_export.repository.process_export" />
34+
<argument type="service" id="sylius_grid_import_export.exporter_resolver" />
35+
<argument type="service" id="sylius_import_export.export.batched_data_manager" />
36+
37+
<tag name="messenger.message_handler" bus="sylius_import_export.export.event_bus" />
38+
</service>
39+
40+
<service id="sylius_import_export.messenger.middleware.export_batch_tracking" class="Sylius\GridImportExport\Messenger\Middleware\ExportBatchTrackingMiddleware">
41+
<argument type="service" id="sylius_grid_import_export.repository.process_export" />
42+
<argument type="service" id="sylius_import_export.export.event_bus" />
43+
44+
<tag name="messenger.middleware" />
2845
</service>
2946

3047
<service id="sylius_import_export.twig.component.export_resource" class="Sylius\GridImportExport\Twig\Component\ExportResourceFormComponent">
@@ -71,6 +88,11 @@
7188
<argument type="service" id="sylius_grid_import_export.factory.process_export" />
7289
</service>
7390

91+
<service id="sylius_import_export.export.batched_data_manager" class="Sylius\GridImportExport\Manager\BatchedExportDataManager">
92+
<argument type="service" id="sylius_grid_import_export.repository.process_export" />
93+
<argument>%sylius_grid_import_export.export_files_directory%</argument>
94+
</service>
95+
7496
<service id="sylius_grid_import_export.menu.admin_listener" class="Sylius\GridImportExport\Menu\AdminImportExportMenuListener">
7597
<argument type="service" id="sylius_grid_import_export.menu.reorder" />
7698
<tag name="kernel.event_listener" method="buildMenu" event="sylius.menu.admin.main" />

src/Entity/ExportProcess.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ class ExportProcess extends Process implements ExportProcessInterface
2323

2424
protected array $resourceIds = [];
2525

26+
protected int $batchesCount = 0;
27+
28+
protected ?string $temporaryDataStorage = null;
29+
2630
public function getType(): string
2731
{
2832
return ExportProcessInterface::TYPE;
@@ -67,4 +71,24 @@ public function setResourceIds(array $resourceIds): void
6771
{
6872
$this->resourceIds = $resourceIds;
6973
}
74+
75+
public function getBatchesCount(): int
76+
{
77+
return $this->batchesCount;
78+
}
79+
80+
public function setBatchesCount(int $count): void
81+
{
82+
$this->batchesCount = $count;
83+
}
84+
85+
public function getTemporaryDataStorage(): ?string
86+
{
87+
return $this->temporaryDataStorage;
88+
}
89+
90+
public function setTemporaryDataStorage(?string $storage): void
91+
{
92+
$this->temporaryDataStorage = $storage;
93+
}
7094
}

src/Entity/ExportProcessInterface.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
<?php
22

3+
/*
4+
* This file is part of the Sylius package.
5+
*
6+
* (c) Sylius Sp. z o.o.
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
312
declare(strict_types=1);
413

514
namespace Sylius\GridImportExport\Entity;
@@ -24,4 +33,11 @@ public function getParameters(): array;
2433

2534
public function setParameters(array $parameters): void;
2635

36+
public function getBatchesCount(): int;
37+
38+
public function setBatchesCount(int $count): void;
39+
40+
public function getTemporaryDataStorage(): ?string;
41+
42+
public function setTemporaryDataStorage(?string $storage): void;
2743
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Sylius package.
5+
*
6+
* (c) Sylius Sp. z o.o.
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Sylius\GridImportExport\Manager;
15+
16+
use Sylius\GridImportExport\Entity\ExportProcessInterface;
17+
use Sylius\GridImportExport\Exporter\CsvExporter;
18+
use Sylius\GridImportExport\Exporter\JsonExporter;
19+
use Sylius\Resource\Doctrine\Persistence\RepositoryInterface;
20+
21+
final class BatchedExportDataManager implements BatchedExportDataManagerInterface
22+
{
23+
private const FILE_BASED_FORMATS = [
24+
CsvExporter::FORMAT,
25+
JsonExporter::FORMAT,
26+
];
27+
28+
private string $temporaryExportsDirectory;
29+
30+
/** @param RepositoryInterface<ExportProcessInterface> $processRepository */
31+
public function __construct(
32+
protected RepositoryInterface $processRepository,
33+
string $temporaryDirectory,
34+
) {
35+
$this->temporaryExportsDirectory = sprintf('%s/ongoing/', $temporaryDirectory);
36+
}
37+
38+
public function createStorage(ExportProcessInterface $exportProcess): void
39+
{
40+
if (in_array($exportProcess->getFormat(), self::FILE_BASED_FORMATS)) {
41+
$ongoingProcessDirectory = $this->temporaryExportsDirectory . '/' . $exportProcess->getUuid();
42+
$exportProcess->setTemporaryDataStorage($ongoingProcessDirectory);
43+
44+
if (!is_dir($ongoingProcessDirectory)) {
45+
mkdir($ongoingProcessDirectory, recursive: true);
46+
}
47+
}
48+
}
49+
50+
public function getStorage(ExportProcessInterface $exportProcess): ?string
51+
{
52+
return $exportProcess->getTemporaryDataStorage();
53+
}
54+
55+
public function setStorage(ExportProcessInterface $exportProcess): void
56+
{
57+
$exportProcess->setTemporaryDataStorage(
58+
$this->temporaryExportsDirectory . '/' . $exportProcess->getUuid(),
59+
);
60+
}
61+
62+
public function resetStorage(ExportProcessInterface $exportProcess): void
63+
{
64+
$exportProcess->setTemporaryDataStorage(null);
65+
}
66+
67+
public function saveBatch(ExportProcessInterface $exportProcess, array $data): void
68+
{
69+
if (null === $this->getStorage($exportProcess)) {
70+
throw new \InvalidArgumentException(
71+
sprintf(
72+
'No storage set on process with uuid "%s"',
73+
$exportProcess->getUuid(),
74+
),
75+
);
76+
}
77+
78+
file_put_contents(
79+
rtrim($this->getStorage($exportProcess), '/') . '/' . uniqid() . '.json',
80+
json_encode($data, \JSON_THROW_ON_ERROR),
81+
);
82+
}
83+
84+
public function getBatchedData(ExportProcessInterface $exportProcess): iterable
85+
{
86+
if (null === $storage = $this->getStorage($exportProcess)) {
87+
throw new \InvalidArgumentException(sprintf('No storage set on process with uuid "%s"', $exportProcess->getUuid()));
88+
}
89+
90+
foreach ($this->getFilesIn($storage) as $file) {
91+
$filePath = $storage . '/' . $file;
92+
if (is_file($filePath)) {
93+
yield json_decode((string) file_get_contents($filePath), true);
94+
}
95+
}
96+
}
97+
98+
public function deleteBatchedData(ExportProcessInterface $exportProcess): void
99+
{
100+
$storage = $this->getStorage($exportProcess);
101+
if (null === $storage || !is_dir($storage)) {
102+
return;
103+
}
104+
105+
foreach ($this->getFilesIn($storage) as $file) {
106+
unlink($storage . '/' . $file);
107+
}
108+
109+
rmdir($storage);
110+
}
111+
112+
private function getFilesIn(string $directory): array
113+
{
114+
$files = scandir($directory);
115+
116+
return array_filter($files ?: [], static function ($file) {
117+
return str_ends_with($file, '.json');
118+
});
119+
}
120+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Sylius package.
5+
*
6+
* (c) Sylius Sp. z o.o.
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Sylius\GridImportExport\Manager;
15+
16+
use Sylius\GridImportExport\Entity\ExportProcessInterface;
17+
18+
interface BatchedExportDataManagerInterface
19+
{
20+
public function createStorage(ExportProcessInterface $exportProcess): void;
21+
22+
public function getStorage(ExportProcessInterface $exportProcess): ?string;
23+
24+
public function setStorage(ExportProcessInterface $exportProcess): void;
25+
26+
public function resetStorage(ExportProcessInterface $exportProcess): void;
27+
28+
public function saveBatch(ExportProcessInterface $exportProcess, array $data): void;
29+
30+
/** @return iterable<array<string, mixed>> */
31+
public function getBatchedData(ExportProcessInterface $exportProcess): iterable;
32+
33+
public function deleteBatchedData(ExportProcessInterface $exportProcess): void;
34+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Sylius package.
5+
*
6+
* (c) Sylius Sp. z o.o.
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace Sylius\GridImportExport\Messenger\Event;
15+
16+
class ExportProcessCompleted
17+
{
18+
public function __construct(
19+
public string $processId,
20+
) {
21+
}
22+
}

src/Messenger/Handler/CreateExportProcessHandler.php

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515

1616
use Sylius\GridImportExport\Entity\ProcessInterface;
1717
use Sylius\GridImportExport\Factory\ProcessFactoryInterface;
18+
use Sylius\GridImportExport\Manager\BatchedExportDataManagerInterface;
1819
use Sylius\GridImportExport\Messenger\Command\CreateExportProcess;
1920
use Sylius\GridImportExport\Messenger\Command\ExportCommand;
21+
use Sylius\GridImportExport\Messenger\Stamp\ExportBatchCounterStamp;
2022
use Sylius\Resource\Doctrine\Persistence\RepositoryInterface;
2123
use Symfony\Component\Messenger\MessageBusInterface;
2224

@@ -27,24 +29,30 @@ class CreateExportProcessHandler
2729
* @param int<1, max> $batchSize
2830
*/
2931
public function __construct(
30-
public ProcessFactoryInterface $processFactory,
31-
public RepositoryInterface $processRepository,
32-
public MessageBusInterface $messageBus,
33-
public int $batchSize = 100,
32+
protected ProcessFactoryInterface $processFactory,
33+
protected RepositoryInterface $processRepository,
34+
protected MessageBusInterface $messageBus,
35+
protected BatchedExportDataManagerInterface $batchedDataManager,
36+
protected int $batchSize = 100,
3437
) {
3538
}
3639

3740
public function __invoke(CreateExportProcess $command): void
3841
{
3942
$process = $this->processFactory->createExportProcess($command);
4043

44+
$this->batchedDataManager->createStorage($process);
45+
46+
$batchesCount = (int) ceil(count($process->getResourceIds()) / $this->batchSize);
47+
$process->setBatchesCount($batchesCount);
48+
4149
$this->processRepository->add($process);
4250

4351
foreach (array_chunk($process->getResourceIds(), $this->batchSize) as $batch) {
4452
$this->messageBus->dispatch(new ExportCommand(
4553
processId: $process->getUuid(),
4654
resourceIds: $batch,
47-
));
55+
), [new ExportBatchCounterStamp()]);
4856
}
4957
}
5058
}

0 commit comments

Comments
 (0)