Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Backward Compatibility Breaks
### Added
* Added support for numeric `minimum_should_match` values in `TermsSet` query [#2293](https://github.com/ruflin/Elastica/pull/2293)
* Added support for "search after" based pagination [#1645](https://github.com/ruflin/Elastica/issues/1645)
### Changed
### Deprecated
### Removed
Expand Down
12 changes: 12 additions & 0 deletions src/Document.php
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,18 @@ public function hasPipeline(): bool
return $this->hasParam('pipeline');
}

/**
* @see \Elastica\Query::setSearchAfter()
*
* Returns the sort position for this document, which can be used as starting offset to search next hits page.
*
* @return array
*/
public function getSort(): array
{
return $this->getParam('sort');
}

/**
* Returns the document as an array.
*/
Expand Down
83 changes: 83 additions & 0 deletions src/Index.php
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,89 @@ public function count($query = ''): int
return $search->count('', false);
}

/**
* Iterates over all documents, matching given query, using "search after" based pagination.
*
* @see \Elastica\Query::setSearchAfter()
*
* @param mixed $query search query with sort by unique document field.
* @param int $batchSize the number of rows to be returned in each batch (e.g. each query size).
* @param array|null $options search request options.
* @return \Generator|\Elastica\Document[] list of all documents matched the given query as iterator.
*/
public function each($query = '', int $batchSize = 100, ?array $options = null): \Generator
{
$query = $this->prepareSearchAfterIteratorQuery($query, $batchSize);

while (true) {
$resultSet = $this->search($query, $options);
foreach ($resultSet->getDocuments() as $document) {
yield $document;
}

if (count($resultSet->getDocuments()) < $batchSize) {
break;
}

$query->setSearchAfter($document->getSort());
}
}

/**
* Iterates over all documents in batches, matching given query, using "search after" based pagination.
*
* @see \Elastica\Query::setSearchAfter()
*
* @param mixed $query search query with sort by unique document field.
* @param int $batchSize the number of rows to be returned in each batch (e.g. each query size).
* @param array|null $options search request options.
* @return \Generator|\Elastica\Document[][] list of document batches matched the given query as iterator.
*/
public function batch($query = '', int $batchSize = 100, ?array $options = null): \Generator
{
$query = $this->prepareSearchAfterIteratorQuery($query, $batchSize);

while (true) {
$resultSet = $this->search($query, $options);

$documents = $resultSet->getDocuments();
if (empty($documents)) {
break;
}

yield $documents;

if (count($documents) < $batchSize) {
break;
}

$lastDocument = array_pop($documents);

$query->setSearchAfter($lastDocument->getSort());
}
}

private function prepareSearchAfterIteratorQuery(mixed $query, int $batchSize): Query
{
if ($batchSize < 1) {
throw new InvalidException('Batch size must be greater than 0.');
}

$query = clone Query::create($query); // if original query is object - keep it intact

if (!$query->hasParam('sort')) {
throw new InvalidException('Query must have "sort" parameter in order to use "search after" based iteration.');
}

if ($query->hasParam('from') && 0 !== $query->getParam('from')) {
throw new InvalidException('Query must not specify "from" parameter in order to use "search after" based iteration.');
}

$query->setSize($batchSize);

return $query;
}

/**
* Opens an index.
*
Expand Down
16 changes: 16 additions & 0 deletions src/Query.php
Original file line number Diff line number Diff line change
Expand Up @@ -476,4 +476,20 @@ public function setTrackTotalHits($trackTotalHits = true): self

return $this->setParam('track_total_hits', $trackTotalHits);
}

/**
* @see https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#search-after
*
* Allows retrieval of the next page of hits using sort values from previous page.
* The value of {@see \Elastica\Document::getSort()} should be passed as argument here.
*
* @param array $searchAfter the sort of the last document from previous search result set.
* @return static self reference.
*/
public function setSearchAfter(array $searchAfter): self
{
$this->setParam('search_after', $searchAfter);

return $this;
}
}
63 changes: 63 additions & 0 deletions tests/IndexTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Elastica\Document;
use Elastica\Index;
use Elastica\Mapping;
use Elastica\Query;
use Elastica\Query\QueryString;
use Elastica\Query\SimpleQueryString;
use Elastica\Query\Term;
Expand Down Expand Up @@ -856,4 +857,66 @@ public function testGetEmptyAliases(): void

$this->assertEquals([], $index->getAliases());
}

#[Group('functional')]
public function testIterateEach(): void
{
$index = $this->_createIndex();
$index->setMapping(new Mapping([
'country_id' => ['type' => 'integer'],
'region_id' => ['type' => 'integer'],
]));

$index->addDocuments([
new Document('1', ['country_id' => 1, 'region_id' => 1]),
new Document('2', ['country_id' => 1, 'region_id' => 2]),
new Document('3', ['country_id' => 2, 'region_id' => 3]),
]);
$index->refresh();

$query = new Query();
$query->setSize(2);
$query->setSort([
'region_id' => 'desc',
]);

$documentIds = [];
foreach ($index->each($query, 2) as $document) {
$documentIds[] = $document->getId();
}

$this->assertEquals(['3', '2', '1'], $documentIds);
}

#[Group('functional')]
public function testIterateBatch(): void
{
$index = $this->_createIndex();
$index->setMapping(new Mapping([
'country_id' => ['type' => 'integer'],
'region_id' => ['type' => 'integer'],
]));

$index->addDocuments([
new Document('1', ['country_id' => 1, 'region_id' => 1]),
new Document('2', ['country_id' => 1, 'region_id' => 2]),
new Document('3', ['country_id' => 2, 'region_id' => 3]),
]);
$index->refresh();

$query = new Query();
$query->setSize(2);
$query->setSort([
'region_id' => 'desc',
]);

$documentIds = [];
foreach ($index->batch($query, 2) as $documents) {
foreach ($documents as $document) {
$documentIds[] = $document->getId();
}
}

$this->assertEquals(['3', '2', '1'], $documentIds);
}
}
40 changes: 40 additions & 0 deletions tests/QueryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -621,4 +621,44 @@ public function testSetTrackTotalHits(): void
$this->assertEquals(25, $resultSet->getTotalHits());
$this->assertEquals('gte', $resultSet->getTotalHitsRelation());
}

#[Group('functional')]
public function testSearchAfter(): void
{
$index = $this->_createIndex();
$index->setMapping(new Mapping([
'country_id' => ['type' => 'integer'],
'region_id' => ['type' => 'integer'],
]));

$index->addDocuments([
new Document('1', ['country_id' => 1, 'region_id' => 1]),
new Document('2', ['country_id' => 1, 'region_id' => 2]),
new Document('3', ['country_id' => 2, 'region_id' => 3]),
]);
$index->refresh();

$query = new Query();
$query->setSize(2);
$query->setSort([
'country_id' => 'desc',
'region_id' => 'asc',
]);
$firstPageResultSet = $index->search($query);

$documents = $firstPageResultSet->getDocuments();
$this->assertCount(2, $documents);

/** @var Document $lastDocument */
$lastDocument = array_pop($documents);
$lastDocument->getParam('sort');

$this->assertNotEmpty($lastDocument->getSort());

$query->setSearchAfter($lastDocument->getSort());

$secondPageResultSet = $index->search($query);
$documents = $secondPageResultSet->getDocuments();
$this->assertCount(1, $documents);
}
}