diff --git a/CHANGELOG.md b/CHANGELOG.md
index 35ebc94a..e30b45d8 100755
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,7 @@
## Unreleased
### Add
- Add Recently viewed component
+- Add worker command for product export - reduce memory usage and increase performance for big catalogs
### Change
- Upgrade Web Components version to v5.2.1
diff --git a/README.md b/README.md
index 07475dcd..17df6358 100755
--- a/README.md
+++ b/README.md
@@ -364,6 +364,15 @@ This is especially useful if you have several SalesChannels with different domai
APP_URL=http://saleschannel-domain.com php [SHOPWARE_ROOT]/bin/console factfinder:data:export
+##### Running export with a worker
+
+Since version 7.3.0, we've introduced a new export flow based on the worker.
+The biggest advantage of this export method is the reduced memory usage, which is especially helpful when you have a large or complex products catalog.
+Currently, this command is only available from the CLI and can be executed by:
+
+ php [SHOPWARE_ROOT]/bin/console factfinder:data:worker-export
+
+The options are the same as described above for `factfinder:data:export`
#### Selecting Categories for CMS Export
With CMS Export we introduced custom field for CategoryEntity by which we filter the Categories going to be exported.
diff --git a/src/Command/ExportBatchCommand.php b/src/Command/ExportBatchCommand.php
new file mode 100644
index 00000000..798eda8c
--- /dev/null
+++ b/src/Command/ExportBatchCommand.php
@@ -0,0 +1,114 @@
+addArgument('sales_channel', InputArgument::REQUIRED, 'ID of the sales channel');
+ $this->addArgument('language', InputArgument::REQUIRED, 'ID of the language');
+ $this->addArgument('offset', InputArgument::REQUIRED, 'Offset');
+ $this->addArgument('limit', InputArgument::REQUIRED, 'Limit');
+ $this->addArgument('file_path', InputArgument::REQUIRED, 'Path to output CSV');
+ }
+
+ protected function execute(InputInterface $input, OutputInterface $output): int
+ {
+ $offset = (int) $input->getArgument('offset');
+ $limit = (int) $input->getArgument('limit');
+ $filePath = $input->getArgument('file_path');
+ $salesChannelId = $input->getArgument('sales_channel');
+ $salesChannel = null;
+
+ if (!empty($salesChannelId)) {
+ $salesChannel = $this->salesChannelRepository->search(
+ new Criteria([$salesChannelId]),
+ new Context(new SystemSource())
+ )->first();
+ }
+
+ $context = $this->channelService->getSalesChannelContext(
+ $salesChannel,
+ $input->getArgument('language')
+ );
+
+ $entityClass = ProductEntity::class;
+ $feedService = $this->feedFactory->create($context, $entityClass);
+ $fileResource = fopen($filePath, 'a');
+ $out = new CsvFile($fileResource);
+ $feedColumns = $this->getFeedColumns('products', ProductEntity::class);
+ $processedCount = $feedService->generateBatch($out, $feedColumns, $offset, $limit, $offset === 0);
+
+ fclose($fileResource);
+
+ $memoryUsageMB = memory_get_usage(true) / 1024 / 1024;
+ $peakMemoryMB = memory_get_peak_usage(true) / 1024 / 1024;
+
+ $result = [
+ 'count' => $processedCount,
+ 'memory' => round($memoryUsageMB, 2),
+ 'peak' => round($peakMemoryMB, 2),
+ ];
+
+ $output->write(json_encode($result));
+
+ return Command::SUCCESS;
+ }
+
+ private function getFeedColumns(string $exportType, string $entityClass): array
+ {
+ $fields = $this->fieldProviders->getFields($entityClass);
+ return array_values(
+ array_unique(
+ array_merge(
+ $this->productsColumnsBase,
+ array_map([$this, 'getFieldName'], $fields),
+ $exportType === self::PRODUCTS_EXPORT_TYPE ? $this->currencyFieldsProvider->getCurrencyFields() : []
+ )
+ )
+ );
+ }
+
+ private function getFieldName(FieldInterface $field): string
+ {
+ return $field->getName();
+ }
+}
diff --git a/src/Command/WorkerDataExportCommand.php b/src/Command/WorkerDataExportCommand.php
new file mode 100644
index 00000000..b2de6c0e
--- /dev/null
+++ b/src/Command/WorkerDataExportCommand.php
@@ -0,0 +1,314 @@
+file = tmpfile();
+ parent::__construct();
+ }
+
+ public function getBaseTypeEntityMap(): array
+ {
+ return [
+ self::PRODUCTS_EXPORT_TYPE => ProductEntity::class,
+ self::BRANDS_EXPORT_TYPE => BrandEntity::class,
+ self::CMS_EXPORT_TYPE => CmsPageEntity::class,
+ self::CATEGORIES_EXPORT_TYPE => CategoryEntity::class,
+ ];
+ }
+
+ public function getTypeEntityMap(): array
+ {
+ return array_merge($this->getBaseTypeEntityMap());
+ }
+
+ public function configure(): void
+ {
+ $this->setName('factfinder:product-data:export');
+ $this->setDescription('Allows to export feed for products in queue method');
+ $this->addOption(self::UPLOAD_FEED_OPTION, 'u', InputOption::VALUE_NONE, 'Should upload after exporting');
+ $this->addOption(self::PUSH_IMPORT_OPTION, 'i', InputOption::VALUE_NONE, 'Should import after uploading');
+ $this->addArgument(self::EXPORT_TYPE_ARGUMENT, InputArgument::OPTIONAL, sprintf('Set data export type(%s)', implode(', ', array_keys($this->getTypeEntityMap()))));
+ $this->addArgument(self::SALES_CHANNEL_ARGUMENT, InputArgument::OPTIONAL, 'ID of the sales channel');
+ $this->addArgument(self::SALES_CHANNEL_LANGUAGE_ARGUMENT, InputArgument::OPTIONAL, 'ID of the sales channel language');
+ }
+
+ /**
+ * @SuppressWarnings(PHPMD.UnusedPrivateMethod)
+ * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
+ * @SuppressWarnings(PHPMD.CyclomaticComplexity)
+ * @SuppressWarnings(PHPMD.NPathComplexity)
+ * @SuppressWarnings(PHPMD.ExcessiveMethodLength)
+ */
+ public function execute(InputInterface $input, OutputInterface $output): int
+ {
+ $saveFile = false;
+
+ if ($input->isInteractive()) {
+ $helper = $this->getHelper('question');
+
+ $exportTypeQuestion = $this->getChoiceQuestion(sprintf('Select data export type (default - %s)', self::PRODUCTS_EXPORT_TYPE), array_keys($this->getTypeEntityMap()), 'Invalid option %s', 0);
+ $exportType = $helper->ask($input, $output, $exportTypeQuestion);
+
+ $salesChannel = $this->getSalesChannel($helper->ask($input, $output, new Question('ID of the sales channel (leave empty if no value): ')));
+ $language = $this->getLanguage($helper->ask($input, $output, new Question('ID of the sales channel language (leave empty if no value): ')));
+
+ $saveFileQuestion = $this->getChoiceQuestion('Save export to local file? (default - no): ', ['no', 'yes'], 'Invalid option %s', 0);
+ $saveFile = (bool) array_flip($saveFileQuestion->getChoices())[$helper->ask($input, $output, $saveFileQuestion)];
+
+ $uploadFeedQuestion = $this->getChoiceQuestion('Should upload after exporting? (default - no): ', ['no', 'yes'], 'Invalid option %s', 0);
+ $uploadFeed = (bool) array_flip($uploadFeedQuestion->getChoices())[$helper->ask($input, $output, $uploadFeedQuestion)];
+
+ $pushImportQuestion = $this->getChoiceQuestion('Should import after uploading? (default - no): ', ['no', 'yes'], 'Invalid option %s', 0);
+ $pushImport = (bool) array_flip($pushImportQuestion->getChoices())[$helper->ask($input, $output, $pushImportQuestion)];
+ } else {
+ $salesChannel = $this->getSalesChannel($input->getArgument(self::SALES_CHANNEL_ARGUMENT));
+ $language = $this->getLanguage($input->getArgument(self::SALES_CHANNEL_LANGUAGE_ARGUMENT));
+ $exportType = $input->getArgument(self::EXPORT_TYPE_ARGUMENT) ?? self::PRODUCTS_EXPORT_TYPE;
+ $uploadFeed = $input->getOption(self::UPLOAD_FEED_OPTION);
+ $pushImport = $input->getOption(self::PUSH_IMPORT_OPTION);
+ }
+
+ $context = $this->channelService->getSalesChannelContext($salesChannel, $language->getId());
+ $entityClass = $this->getExportedEntityClass($exportType);
+ $feedColumns = $this->getFeedColumns($exportType, $entityClass);
+
+ $needFile = $saveFile || $uploadFeed;
+
+ if ($needFile) {
+ fclose($this->file);
+ $this->createFile($exportType, $context->getSalesChannelId());
+ }
+
+ $filePath = stream_get_meta_data($this->file)['uri'];
+
+ if ($exportType === self::PRODUCTS_EXPORT_TYPE) {
+ $phpBinaryFinder = new PhpExecutableFinder();
+ $phpBinary = $phpBinaryFinder->find() ?: 'php';
+
+ $batchSize = 100;
+ $offset = 0;
+
+ $output->writeln('Start generate data feed file by queue system:');
+
+ while (true) {
+ $process = new Process([
+ $phpBinary,
+ 'bin/console',
+ 'factfinder:export:batch',
+ $salesChannel ? $salesChannel->getId() : '',
+ $language->getId(),
+ (string) $offset,
+ (string) $batchSize,
+ $filePath,
+ '--env=prod',
+ ]);
+
+ // 10 minutes for big batch size (with a lot of variants)
+ $process->setTimeout(600);
+ $process->run();
+
+ if (!$process->isSuccessful()) {
+ $output->writeln("Error during generate export (Offset: $offset).");
+ $output->writeln($process->getErrorOutput());
+ return Command::FAILURE;
+ }
+
+ $rawOutput = trim($process->getOutput());
+ preg_match('/\{.*\}/', $rawOutput, $matches);
+ $jsonOutput = $matches[0] ?? '{}';
+
+ $result = json_decode($jsonOutput, true);
+ $processedCount = $result['count'] ?? 0;
+
+ if ($processedCount === 0) {
+ break;
+ }
+
+ // Uncomment if you want to debug memory consumption for each batch
+ // $workerMemory = $result['memory'] ?? 0;
+ // $workerPeak = $result['peak'] ?? 0;
+ // $masterPeak = memory_get_peak_usage(true) / 1024 / 1024;
+ // $output->writeln(sprintf(
+ // '[%s] Offset: %d | Worker: %.2f MB | Worker Final: %.2f MB | MASTER: %.2f MB',
+ // date('H:i:s'),
+ // $offset,
+ // $workerPeak,
+ // $workerMemory,
+ // $masterPeak
+ // ));
+
+ $offset += $batchSize;
+ }
+
+ if (!$needFile) {
+ $output->write(file_get_contents($filePath));
+ }
+ } else {
+ // Old flow for CMS, CATEGORY and BRANDS
+ $feedService = $this->feedFactory->create($context, $entityClass);
+ $out = $needFile ? new CsvFile($this->file) : new ConsoleOutput($output);
+ $feedService->generate($out, $feedColumns);
+ }
+
+ $output->writeln('Generate export data feed file completed');
+
+ if ($uploadFeed) {
+ $this->uploadService->upload($this->file);
+ }
+
+ if ($pushImport) {
+ $this->pushImportService->execute();
+ }
+
+ if (!$saveFile && $this->file) {
+ $metaData = stream_get_meta_data($this->file);
+
+ if (file_exists($metaData['uri'])) {
+ unlink($metaData['uri']);
+ }
+ }
+
+ return Command::SUCCESS;
+ }
+
+ private function getSalesChannel(?string $id = null): ?SalesChannelEntity
+ {
+ return !is_null($id)
+ ? $this->channelRepository->search(new Criteria([$id]), new Context(new SystemSource()))->first()
+ : null;
+ }
+
+ private function getLanguage(?string $id = null): LanguageEntity
+ {
+ return $this->languageRepository->search(
+ new Criteria([$id ?: Defaults::LANGUAGE_SYSTEM]),
+ new Context(new SystemSource())
+ )->first();
+ }
+
+ private function getFeedColumns(string $exportType, string $entityClass): array
+ {
+ $fields = $this->fieldProviders->getFields($entityClass);
+ return array_values(
+ array_unique(
+ array_merge(
+ $this->productsColumnsBase,
+ array_map([$this, 'getFieldName'], $fields),
+ $exportType === self::PRODUCTS_EXPORT_TYPE ? $this->currencyFieldsProvider->getCurrencyFields() : [])));
+ }
+
+ private function getFieldName(FieldInterface $field): string
+ {
+ return $field->getName();
+ }
+
+ private function getChoiceQuestion(string $question, array $choices, string $errorMessage, $defaultValue = null): ChoiceQuestion
+ {
+ return (new ChoiceQuestion($question, $choices, $defaultValue))->setErrorMessage($errorMessage);
+ }
+
+ private function getExportedEntityClass(string $exportType): string
+ {
+ $entityTypeMap = $this->getTypeEntityMap();
+
+ if (isset($entityTypeMap[$exportType])) {
+ return $entityTypeMap[$exportType];
+ }
+
+ throw new \Exception('Unknown export type');
+ }
+
+ /**
+ * @return false|resource
+ *
+ * @throws \Exception
+ */
+ private function createFile(string $exportType, string $salesChannelId)
+ {
+ $dir = $this->kernelProjectDir . '/var/factfinder';
+
+ if (!is_dir($dir)) {
+ mkdir($dir);
+ }
+
+ if (!is_writable($dir)) {
+ throw new \Exception('Directory ' . $dir . ' is not writable. Aborting');
+ }
+
+ $channelId = $this->communication->getChannel($salesChannelId);
+ $filename = $dir . DIRECTORY_SEPARATOR . (new ChannelTypeNamingStrategy())->createFeedFileName($exportType, $channelId);
+ $this->file = fopen($filename, 'w+');
+
+ return $this->file;
+ }
+}
diff --git a/src/Export/ExportProducts.php b/src/Export/ExportProducts.php
index 974e7551..e9b8f3b4 100644
--- a/src/Export/ExportProducts.php
+++ b/src/Export/ExportProducts.php
@@ -34,15 +34,30 @@ public function getByContext(SalesChannelContext $context, int $batchSize = 100)
}
}
+ public function getBatchByContext(SalesChannelContext $context, int $limit, int $offset): iterable
+ {
+ $criteria = $this->getCriteria($limit, $offset);
+ $products = $this->productRepository->search($criteria, $context);
+
+ foreach ($products->getElements() as $product) {
+ yield $product;
+ }
+ }
+
public function getProducedExportEntityType(): string
{
return ExportProductEntity::class;
}
- private function getCriteria(int $batchSize): Criteria
+ private function getCriteria(int $batchSize, ?int $offset = null): Criteria
{
$criteria = new Criteria();
$criteria->setLimit($batchSize);
+
+ if ($offset) {
+ $criteria->setOffset($offset);
+ }
+
$criteria->addAssociation('categories');
$criteria->addAssociation('categoriesRo');
$criteria->addAssociation('children.options.group');
@@ -53,9 +68,11 @@ private function getCriteria(int $batchSize): Criteria
$criteria->addAssociation('seoUrls');
$criteria->addAssociation('media');
$criteria->addAssociation('children.cover.media');
+
foreach ($this->customAssociations as $association) {
$criteria->addAssociation($association);
}
+
$criteria->addFilter(new EqualsFilter('parentId', null));
return $criteria;
diff --git a/src/Export/Feed.php b/src/Export/Feed.php
index b9047885..0018761c 100644
--- a/src/Export/Feed.php
+++ b/src/Export/Feed.php
@@ -42,6 +42,31 @@ public function getEntities(): iterable
}
}
+ public function generateBatch(StreamInterface $stream, array $columns, int $offset, int $limit, bool $writeHeaders): int
+ {
+ if ($writeHeaders) {
+ $stream->addEntity($columns);
+ }
+
+ $emptyRecord = array_combine($columns, array_fill(0, count($columns), ''));
+ $processedCount = 0;
+
+ foreach ($this->getEntitiesForBatch($offset, $limit) as $entity) {
+ $entityData = array_merge($emptyRecord, array_intersect_key($entity->toArray(), $emptyRecord));
+ $stream->addEntity($this->prepare($entityData));
+ $processedCount++;
+ }
+
+ return $processedCount;
+ }
+
+ public function getEntitiesForBatch(int $offset, int $limit): iterable
+ {
+ foreach ($this->exporter->getBatchByContext($this->context, $limit, $offset) as $entity) {
+ yield from $this->compositeFactory->createEntities($entity, $this->exporter->getProducedExportEntityType());
+ }
+ }
+
private function prepare(array $data): array
{
return array_map([$this->filter, 'filterValue'], $data);