Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Backward Compatibility Breaks
### Added
* Added support for numeric `minimum_should_match` values in `TermsSet` query [#2293](https://github.com/ruflin/Elastica/pull/2293)
* Added support for "search after" based pagination [#1645](https://github.com/ruflin/Elastica/issues/1645)
### Changed
### Deprecated
### Removed
Expand Down
12 changes: 12 additions & 0 deletions src/Document.php
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,18 @@ public function hasPipeline(): bool
return $this->hasParam('pipeline');
}

/**
* @see \Elastica\Query::setSearchAfter()
*
* Returns the sort position for this document, which can be used as starting offset to search next hits page.
*
* @return array
*/
public function getSort(): array
{
return $this->getParam('sort');
}

/**
* Returns the document as an array.
*/
Expand Down
82 changes: 82 additions & 0 deletions src/Index.php
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,88 @@ public function count($query = ''): int
return $search->count('', false);
}

/**
* Iterates over all documents, matching given query, using "search after" based pagination.
*
* @see \Elastica\Query::setSearchAfter()
*
* @param mixed $query search query with sort by unique document field.
* @param int $batchSize the number of rows to be returned in each batch (e.g. each query size).
* @param array|null $options search request options.
* @return \Generator|\Elastica\Document[] list of all documents matched the given query as iterator.
*/
public function each($query = '', int $batchSize = 100, ?array $options = null): \Generator
{
if ($batchSize < 1) {
throw new InvalidException('Batch size must be greater than 0.');
}

$query = clone Query::create($query);

if (!$query->hasParam('sort')) {
throw new InvalidException('Query must have "sort" parameter in order to use "search after" based iteration.');
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

$query->setSize($batchSize);

while (true) {
$resultSet = $this->search($query, $options);
foreach ($resultSet->getDocuments() as $document) {
yield $document;
}

if (count($resultSet->getDocuments()) < $batchSize) {
break;
}

$query->setSearchAfter($document->getSort());
}
}

/**
* Iterates over all documents in batches, matching given query, using "search after" based pagination.
*
* @see \Elastica\Query::setSearchAfter()
*
* @param mixed $query search query with sort by unique document field.
* @param int $batchSize the number of rows to be returned in each batch (e.g. each query size).
* @param array|null $options search request options.
* @return \Generator|\Elastica\Document[][] list of document batches matched the given query as iterator.
*/
public function batch($query = '', int $batchSize = 100, ?array $options = null): \Generator
{
if ($batchSize < 1) {
throw new InvalidException('Batch size must be greater than 0.');
}

$query = clone Query::create($query);

if (!$query->hasParam('sort')) {
throw new InvalidException('Query must have "sort" parameter in order to use "search after" based iteration.');
}

$query->setSize($batchSize);

while (true) {
$resultSet = $this->search($query, $options);

$documents = $resultSet->getDocuments();
if (empty($documents)) {
break;
}

yield $documents;

if (count($documents) < $batchSize) {
break;
}

$lastDocument = array_pop($documents);

$query->setSearchAfter($lastDocument->getSort());
}
}

/**
* Opens an index.
*
Expand Down
16 changes: 16 additions & 0 deletions src/Query.php
Original file line number Diff line number Diff line change
Expand Up @@ -476,4 +476,20 @@ public function setTrackTotalHits($trackTotalHits = true): self

return $this->setParam('track_total_hits', $trackTotalHits);
}

/**
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#search-after
*
* Allows retrieval of the next page of hits using sort values from previous page.
* The value of {@see \Elastica\Document::getSort()} should be passed as argument here.
*
* @param array $searchAfter the sort of the last document from previous search result set.
* @return static self reference.
*/
public function setSearchAfter(array $searchAfter): self
{
$this->setParam('search_after', $searchAfter);

return $this;
}
}
63 changes: 63 additions & 0 deletions tests/IndexTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Elastica\Document;
use Elastica\Index;
use Elastica\Mapping;
use Elastica\Query;
use Elastica\Query\QueryString;
use Elastica\Query\SimpleQueryString;
use Elastica\Query\Term;
Expand Down Expand Up @@ -856,4 +857,66 @@ public function testGetEmptyAliases(): void

$this->assertEquals([], $index->getAliases());
}

#[Group('functional')]
public function testIterateEach(): void
{
$index = $this->_createIndex();
$index->setMapping(new Mapping([
'country_id' => ['type' => 'integer'],
'region_id' => ['type' => 'integer'],
]));

$index->addDocuments([
new Document('1', ['country_id' => 1, 'region_id' => 1]),
new Document('2', ['country_id' => 1, 'region_id' => 2]),
new Document('3', ['country_id' => 2, 'region_id' => 3]),
]);
$index->refresh();

$query = new Query();
$query->setSize(2);
$query->setSort([
'region_id' => 'desc',
]);

$documentIds = [];
foreach ($index->each($query, 2) as $document) {
$documentIds[] = $document->getId();
}

$this->assertEquals(['3', '2', '1'], $documentIds);
}

#[Group('functional')]
public function testIterateBatch(): void
{
$index = $this->_createIndex();
$index->setMapping(new Mapping([
'country_id' => ['type' => 'integer'],
'region_id' => ['type' => 'integer'],
]));

$index->addDocuments([
new Document('1', ['country_id' => 1, 'region_id' => 1]),
new Document('2', ['country_id' => 1, 'region_id' => 2]),
new Document('3', ['country_id' => 2, 'region_id' => 3]),
]);
$index->refresh();

$query = new Query();
$query->setSize(2);
$query->setSort([
'region_id' => 'desc',
]);

$documentIds = [];
foreach ($index->batch($query, 2) as $documents) {
foreach ($documents as $document) {
$documentIds[] = $document->getId();
}
}

$this->assertEquals(['3', '2', '1'], $documentIds);
}
}
40 changes: 40 additions & 0 deletions tests/QueryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -621,4 +621,44 @@ public function testSetTrackTotalHits(): void
$this->assertEquals(25, $resultSet->getTotalHits());
$this->assertEquals('gte', $resultSet->getTotalHitsRelation());
}

#[Group('functional')]
public function testSearchAfter(): void
{
$index = $this->_createIndex();
$index->setMapping(new Mapping([
'country_id' => ['type' => 'integer'],
'region_id' => ['type' => 'integer'],
]));

$index->addDocuments([
new Document('1', ['country_id' => 1, 'region_id' => 1]),
new Document('2', ['country_id' => 1, 'region_id' => 2]),
new Document('3', ['country_id' => 2, 'region_id' => 3]),
]);
$index->refresh();

$query = new Query();
$query->setSize(2);
$query->setSort([
'country_id' => 'desc',
'region_id' => 'asc',
]);
$firstPageResultSet = $index->search($query);

$documents = $firstPageResultSet->getDocuments();
$this->assertCount(2, $documents);

/** @var Document $lastDocument */
$lastDocument = array_pop($documents);
$lastDocument->getParam('sort');

$this->assertNotEmpty($lastDocument->getSort());

$query->setSearchAfter($lastDocument->getSort());

$secondPageResultSet = $index->search($query);
$documents = $secondPageResultSet->getDocuments();
$this->assertCount(1, $documents);
}
}