Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions lib/CsvImportValidator.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public function validate()
// Iterate csv rows, calling each test/row.
foreach ($this->rows as $row) {
if ($this->showDisplayProgress) {
echo $this->renderProgressDescription();
$this->renderProgressDescription();
}

$this->validatorCollection->testRow($this->header, $row);
Expand All @@ -167,7 +167,7 @@ public function validate()
}

if ($this->showDisplayProgress) {
echo $this->renderProgressDescription(true);
$this->renderProgressDescription(true);
}

return $this->resultCollection;
Expand Down Expand Up @@ -365,13 +365,51 @@ public static function validateFilename($filename)

public function renderProgressDescription(bool $complete = false)
{
$output = '.';
// Periodic single-line summaries to STDERR; final summary to STDOUT when complete.
static $startTime = null;
static $lastLogTime = null;
static $processedCount = 0;

if ($complete) {
return "\nAnalysis complete.\n";
if (null === $startTime) {
return ''; // Nothing processed
}

$totalDuration = microtime(true) - $startTime;
$finalRate = $processedCount / max($totalDuration, 1e-9);
echo sprintf(
"\rProcessed %d rows total in %.2fs (%.1f/s)\n",
$processedCount,
$totalDuration,
$finalRate
);

// Reset for potential reuse; no stdout/stderr output
$startTime = null;
$lastLogTime = null;
$processedCount = 0;

return '';
}

if (null === $startTime) {
$startTime = microtime(true);
$lastLogTime = $startTime;
}

++$processedCount;

$now = microtime(true);
if ($now - $lastLogTime >= 5) {
// Ensure elapsed is never zero (avoid div by zero in rate calc below)
$elapsed = max($now - $startTime, 1e-9);
$rate = $processedCount / $elapsed;
fwrite(STDERR, sprintf("\rProcessed %d rows (%.1f/s)", $processedCount, $rate));
fflush(STDERR);
$lastLogTime = $now;
}

return $output;
return '';
}

protected function getLongestRow(): int
Expand Down
2 changes: 1 addition & 1 deletion lib/QubitCsvImport.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public function import($csvFile, $type = null, $csvOrigFileName = null)
}

// Warnings
$this->errors = $output;
$this->errors = isset($output) ? $output : [];

return $this;
}
Expand Down
67 changes: 47 additions & 20 deletions lib/QubitFlatfileImport.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,13 @@ public function logError($message, $includeCurrentRowNumber = true)
{
$message = ($includeCurrentRowNumber) ? sprintf("Row %d: %s\n", $this->getStatus('rows') + 1, $message) : $message;

// If a carriage-return progress line is active on STDERR, break it so
// subsequent STDOUT messages start on a new line.
if ($this->displayProgress) {
fwrite(STDERR, "\r");
fflush(STDERR);
}

if ($this->errorLog) {
file_put_contents($this->errorLog, $message, FILE_APPEND);
}
Expand Down Expand Up @@ -413,7 +420,7 @@ public function csv($fh, $skipRows = 0)
++$this->status['rows'];

if ($this->displayProgress) {
echo $this->renderProgressDescription();
$this->renderProgressDescription();
}
} else {
++$this->status['rows'];
Expand All @@ -424,6 +431,25 @@ public function csv($fh, $skipRows = 0)
$this->stopTimer();
}

if ($this->displayProgress) {
fwrite(STDERR, "\r");
fflush(STDERR);
$this->progressLineActive = false;
}

// Final summary to STDOUT for logs
$rowsProcessed = $this->getStatus('rows') - $this->getStatus('skippedRows');
$totalDuration = $this->getTimeElapsed();
// Ensure total duration is never zero (avoid div by zero in rate calc below)
$finalRate = $rowsProcessed / max($totalDuration, 1e-9);
$msg = sprintf(
"Processed %d rows total in %.2fs (%.1f/s)\n",
$rowsProcessed,
$totalDuration,
$finalRate
);
echo $this->logError($msg, false);

if ($this->status['duplicates']) {
$msg = sprintf('Duplicates found: %d', $this->status['duplicates']);
echo $this->logError($msg, false);
Expand Down Expand Up @@ -547,30 +573,30 @@ public function isUpdating()
*/
public function renderProgressDescription()
{
$output = '.';
// Periodic single-line summaries to STDERR.
static $startTime = null;
static $lastLogTime = null;
static $processedCount = 0;

// return empty string if no intermittant progress display
if (
!isset($this->rowsUntilProgressDisplay)
|| !$this->rowsUntilProgressDisplay
) {
return $output;
if (null === $startTime) {
$startTime = microtime(true);
$lastLogTime = $startTime;
}
// row count isn't incremented until after this is displayed, so add one to reflect reality
$rowsProcessed = $this->getStatus('rows') - $this->getStatus('skippedRows');
$memoryUsageMB = round(memory_get_usage() / (1024 * 1024), 2);

// if this show should be displayed, display it
if (!($rowsProcessed % $this->rowsUntilProgressDisplay)) {
$elapsed = $this->getTimeElapsed();
$elapsedMinutes = round($elapsed / 60, 2);
$averageTime = round($elapsed / $rowsProcessed, 2);
++$processedCount;

$output .= "\n".$rowsProcessed.' rows processed in '.$elapsedMinutes
.' minutes ('.$averageTime.' second/row average, '.$memoryUsageMB." MB used).\n";
$now = microtime(true);
if ($now - $lastLogTime >= 5) {
// Ensure elapsed is never zero (avoid div by zero in rate calc below)
$elapsed = max($now - $startTime, 1e-9);
$rate = $processedCount / $elapsed;
$memoryUsageMB = round(memory_get_usage() / (1024 * 1024), 2);
fwrite(STDERR, sprintf("\rProcessed %d rows (%.1f/s, %.2f MB)", $processedCount, $rate, $memoryUsageMB));
fflush(STDERR);
$lastLogTime = $now;
}

return $output;
return '';
}

/*
Expand Down Expand Up @@ -1498,7 +1524,8 @@ protected function startTimer()
*/
protected function stopTimer()
{
$this->timer->stop();
// Record elapsed time into the timer's total so elapsed() reports correctly.
$this->timer->add(false);
}

/**
Expand Down
3 changes: 1 addition & 2 deletions lib/task/export/csvExportAccessionsTask.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public function execute($arguments = [], $options = [])

$writer->exportResource($accessionRecord);

$this->indicateProgress($options['items-until-update']);
++$itemsExported;
$this->indicateProgress(++$itemsExported);
}

$this->log('');
Expand Down
4 changes: 1 addition & 3 deletions lib/task/export/csvExportBulkTask.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,7 @@ public function execute($arguments = [], $options = [])

$writer->exportResource($resource);

$this->indicateProgress($options['items-until-update']);

++$itemsExported;
$this->indicateProgress(++$itemsExported);

if (0 === ($itemsExported % 1000)) {
Qubit::clearClassCaches();
Expand Down
4 changes: 1 addition & 3 deletions lib/task/export/csvExportInformationObjectsTask.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ public function execute($arguments = [], $options = [])

$writer->exportResource($resource);

$this->indicateProgress($options['items-until-update']);

++$itemsExported;
$this->indicateProgress(++$itemsExported);
}

echo "\nExport complete (".$itemsExported." descriptions exported).\n";
Expand Down
12 changes: 3 additions & 9 deletions lib/task/export/csvExportTermUsageTask.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,12 @@ class csvExportTermUsageTask extends exportBulkBaseTask
*/
public function execute($arguments = [], $options = [])
{
if (isset($options['items-until-update']) && !ctype_digit($options['items-until-update'])) {
throw new sfException('items-until-update must be a number');
}

$configuration = ProjectConfiguration::getApplicationConfiguration('qubit', 'cli', false);
$sf_context = sfContext::createInstance($configuration);
$conn = $this->getDatabaseConnection();

$this->exportFileReplacePrompt($arguments['path']);
$itemsExported = $this->exportToCsv($this->determineTaxonomyId($options), $arguments['path'], $options['items-until-update']);
$itemsExported = $this->exportToCsv($this->determineTaxonomyId($options), $arguments['path']);

if ($itemsExported) {
$this->log(sprintf("\nExport complete (%d terms exported).", $itemsExported));
Expand Down Expand Up @@ -113,7 +109,7 @@ private function exportFileReplacePrompt($exportPath)
}
}

private function exportToCsv($taxonomyId, $exportPath, $rowsUntilUpdate)
private function exportToCsv($taxonomyId, $exportPath)
{
$itemsExported = 0;

Expand Down Expand Up @@ -141,9 +137,7 @@ private function exportToCsv($taxonomyId, $exportPath, $rowsUntilUpdate)
$writer->setColumn('use_count', $row->use_count);
$writer->exportResource($resource);

$this->indicateProgress($rowsUntilUpdate);

++$itemsExported;
$this->indicateProgress(++$itemsExported);
}
}

Expand Down
4 changes: 1 addition & 3 deletions lib/task/export/eacExportTask.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public function execute($arguments = [], $options = [])
}

file_put_contents($filePath, $xml);
$this->indicateProgress($options['items-until-update']);
++$itemsExported;
$this->indicateProgress(++$itemsExported);
} else {
$this->log("{$filePath} already exists, skipping...");
}
Expand All @@ -102,7 +101,6 @@ protected function configure()
new sfCommandOption('application', null, sfCommandOption::PARAMETER_OPTIONAL, 'The application name', 'qubit'),
new sfCommandOption('env', null, sfCommandOption::PARAMETER_REQUIRED, 'The environment', 'cli'),
new sfCommandOption('connection', null, sfCommandOption::PARAMETER_REQUIRED, 'The connection name', 'propel'),
new sfCommandOption('items-until-update', null, sfCommandOption::PARAMETER_OPTIONAL, 'Indicate progress every n items.'),
new sfCommandOption('criteria', null, sfCommandOption::PARAMETER_OPTIONAL, 'Export criteria'),
]);
}
Expand Down
3 changes: 1 addition & 2 deletions lib/task/export/exportAuthorityRecordsTask.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public function execute($arguments = [], $options = [])

$writer->exportResource($actor);

$this->indicateProgress($options['items-until-update']);
++$itemsExported;
$this->indicateProgress(++$itemsExported);
}

$this->log('');
Expand Down
22 changes: 17 additions & 5 deletions lib/task/export/exportBulkBaseTask.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ protected function addCoreArgumentsAndOptions()
new sfCommandOption('application', null, sfCommandOption::PARAMETER_OPTIONAL, 'The application name', 'qubit'),
new sfCommandOption('env', null, sfCommandOption::PARAMETER_REQUIRED, 'The environment', 'cli'),
new sfCommandOption('connection', null, sfCommandOption::PARAMETER_REQUIRED, 'The connection name', 'propel'),
new sfCommandOption('items-until-update', null, sfCommandOption::PARAMETER_OPTIONAL, 'Indicate progress every n items.'),
]);
}

Expand Down Expand Up @@ -233,11 +232,24 @@ protected function getDatabaseConnection()
return $databaseManager->getDatabase('propel')->getConnection();
}

protected function indicateProgress($itemsUntilUpdate)
protected function indicateProgress(int $processedCount = 0)
{
// If progress indicator should be displayed, display it
if (!isset($itemsUntilUpdate) || !($itemsExported % $itemsUntilUpdate)) {
echo '.';
// Periodic single-line summaries to STDERR; keep signature for BC
static $startTime = null;
static $lastLogTime = null;

if (null === $startTime) {
$startTime = microtime(true);
$lastLogTime = $startTime;
}

$now = microtime(true);
if ($now - $lastLogTime >= 5) {
$elapsed = max($now - $startTime, 1e-9);
$rate = $processedCount / $elapsed;
fwrite(STDERR, sprintf("\rProcessed %d items (%.1f/s)", $processedCount, $rate));
fflush(STDERR);
$lastLogTime = $now;
}
}
}
4 changes: 2 additions & 2 deletions lib/task/export/exportBulkTask.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ public function execute($arguments = [], $options = [])
throw new sfException("Cannot write to path: {$filePath}");
}

$this->indicateProgress($options['items-until-update']);
$this->indicateProgress(++$itemsExported);

if (0 == $itemsExported++ % 1000) {
if (0 == $itemsExported % 1000) {
Qubit::clearClassCaches();
}
}
Expand Down
17 changes: 10 additions & 7 deletions lib/task/import/importBulkTask.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,12 @@ protected function execute($arguments = [], $options = [])
rename($file, $move_destination);
}

if (!$options['verbose']) {
echo '.';
}

if ($importer->hasErrors()) {
foreach ($importer->getErrors() as $message) {
$this->log('('.$file.'): '.$message);
}
}

echo '.';

// Try to free up memory
unset($importer);

Expand Down Expand Up @@ -176,7 +170,16 @@ protected function execute($arguments = [], $options = [])
QubitSearch::getInstance()->optimize();
}

$this->log("\nImported ".$count.' XML/CSV files in '.$timer->elapsed().' s. '.memory_get_peak_usage().' bytes used.');
if (!empty($options['verbose'])) {
$elapsed = $timer->elapsed();
$peakMb = round(memory_get_peak_usage() / 1048576, 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a constant instead 1048576? Or use 1024 * 1024 for easier comprehension and consistency like QubitFlatFileImport does?

$this->log(sprintf(
'Imported %d XML/CSV files in %.2fs, peak %.2f MB',
$count,
$elapsed,
$peakMb
));
}
}

protected function dir_tree($dir)
Expand Down
Loading
Loading