Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Cluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Elastica\Cluster\Health;
use Elastica\Cluster\Settings;
use Elasticsearch\Endpoints\Cluster\State;

/**
* Cluster information for elasticsearch.
Expand Down Expand Up @@ -50,7 +49,8 @@ public function __construct(Client $client)
*/
public function refresh(): void
{
$this->_response = $this->_client->requestEndpoint(new State());
$esResponse = $this->_client->getConnection()->getClient()->cluster()->state();
$this->_response = new Response($esResponse->asArray(), $esResponse->getStatusCode());
$this->_data = $this->getResponse()->getData();
}

Expand Down
9 changes: 4 additions & 5 deletions src/Cluster/Health.php
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,10 @@ public function getIndices(): array
*/
protected function _retrieveHealthData(): array
{
$endpoint = new \Elasticsearch\Endpoints\Cluster\Health();
$endpoint->setParams(['level' => 'shards']);
$esResponse = $this->_client->getConnection()->getClient()->cluster()->health([
'level' => 'shards',
]);

$response = $this->_client->requestEndpoint($endpoint);

return $response->getData();
return $esResponse->asArray();
}
}
8 changes: 4 additions & 4 deletions src/Index/Recovery.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Elastica\Index as BaseIndex;
use Elastica\Response;
use Elasticsearch\Endpoints\Indices\Recovery as RecoveryEndpoint;

/**
* Elastica index recovery object.
Expand Down Expand Up @@ -94,9 +93,10 @@ public function refresh(): self
*/
protected function getRecoveryData()
{
$endpoint = new RecoveryEndpoint();

$this->_response = $this->getIndex()->requestEndpoint($endpoint);
$esResponse = $this->getIndex()->getClient()->getConnection()->getClient()->indices()->recovery([
'index' => $this->getIndex()->getName(),
]);
$this->_response = new Response($esResponse->asArray(), $esResponse->getStatusCode());

return $this->getResponse()->getData();
}
Expand Down
5 changes: 4 additions & 1 deletion src/Index/Stats.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ public function getResponse(): Response
*/
public function refresh(): void
{
$this->_response = $this->getIndex()->requestEndpoint(new \Elasticsearch\Endpoints\Indices\Stats());
$esResponse = $this->getIndex()->getClient()->getConnection()->getClient()->indices()->stats([
'index' => $this->getIndex()->getName(),
]);
$this->_response = new Response($esResponse->asArray(), $esResponse->getStatusCode());
$this->_data = $this->getResponse()->getData();
}
}
11 changes: 1 addition & 10 deletions src/Mapping.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
namespace Elastica;

use Elastica\Exception\InvalidException;
use Elasticsearch\Endpoints\Indices\Mapping\Put;
use Elasticsearch\Endpoints\Indices\PutMapping;

/**
* Elastica Mapping object.
Expand Down Expand Up @@ -164,12 +162,7 @@ public function toArray(): array
*/
public function send(Index $index, array $query = []): Response
{
// TODO: Use only PutMapping when dropping support for elasticsearch/elasticsearch 7.x
$endpoint = \class_exists(PutMapping::class) ? new PutMapping() : new Put();
$endpoint->setBody($this->toArray());
$endpoint->setParams($query);

return $index->requestEndpoint($endpoint);
return $index->getClient()->putIndexMapping($index->getName(), $this->toArray(), $query);
}

/**
Expand All @@ -178,8 +171,6 @@ public function send(Index $index, array $query = []): Response
* @param array|Mapping $mapping Mapping object or properties array
*
* @throws InvalidException If invalid type
*
* @return self
*/
public static function create($mapping): Mapping
{
Expand Down
15 changes: 6 additions & 9 deletions src/Node/Info.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Elastica\Node as BaseNode;
use Elastica\Response;
use Elasticsearch\Endpoints\Nodes\Info as NodesInfo;

/**
* Elastica cluster node object.
Expand Down Expand Up @@ -94,7 +93,6 @@ public function get(...$args)
*/
public function getPort(): string
{
// Returns string in format: inet[/192.168.1.115:9201]
$data = $this->get('http_address');
$data = \substr($data, 6, -1);
$data = \explode(':', $data);
Expand All @@ -109,7 +107,6 @@ public function getPort(): string
*/
public function getIp(): string
{
// Returns string in format: inet[/192.168.1.115:9201]
$data = $this->get('http_address');
$data = \substr($data, 6, -1);
$data = \explode(':', $data);
Expand All @@ -127,7 +124,6 @@ public function getIp(): string
public function getPlugins(): array
{
if (!\in_array('plugins', $this->_params, true)) {
// Plugin data was not retrieved when refresh() was called last. Get it now.
$this->_params[] = 'plugins';
$this->refresh($this->_params);
}
Expand Down Expand Up @@ -210,15 +206,16 @@ public function refresh(array $params = []): Response
{
$this->_params = $params;

// TODO: Use only NodesInfo when dropping support for elasticsearch/elasticsearch 7.x
$endpoint = \class_exists(NodesInfo::class) ? new NodesInfo() : new \Elasticsearch\Endpoints\Cluster\Nodes\Info();
$endpoint->setNodeId($this->getNode()->getId());
$requestParams = [
'node_id' => $this->getNode()->getId(),
];

if ($params) {
$endpoint->setMetric($params);
$requestParams['metric'] = $params;
}

$this->_response = $this->getNode()->getClient()->requestEndpoint($endpoint);
$esResponse = $this->getNode()->getClient()->getConnection()->getClient()->nodes()->info($requestParams);
$this->_response = new Response($esResponse->asArray(), $esResponse->getStatusCode());
$data = $this->getResponse()->getData();

$this->_data = \reset($data['nodes']);
Expand Down
10 changes: 4 additions & 6 deletions src/Node/Stats.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Elastica\Node as BaseNode;
use Elastica\Response;
use Elasticsearch\Endpoints\Nodes\Stats as NodesStats;

/**
* Elastica cluster node object.
Expand Down Expand Up @@ -107,11 +106,10 @@ public function getResponse(): Response
*/
public function refresh(): Response
{
// TODO: Use only NodesStats when dropping support for elasticsearch/elasticsearch 7.x
$endpoint = \class_exists(NodesStats::class) ? new NodesStats() : new \Elasticsearch\Endpoints\Cluster\Nodes\Stats();
$endpoint->setNodeId($this->getNode()->getName());

$this->_response = $this->getNode()->getClient()->requestEndpoint($endpoint);
$esResponse = $this->getNode()->getClient()->getConnection()->getClient()->nodes()->stats([
'node_id' => $this->getNode()->getName(),
]);
$this->_response = new Response($esResponse->asArray(), $esResponse->getStatusCode());
$data = $this->getResponse()->getData();
$this->_data = \reset($data['nodes']);

Expand Down
42 changes: 18 additions & 24 deletions src/Pipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,6 @@

use Elastica\Exception\InvalidException;
use Elastica\Processor\AbstractProcessor;
use Elasticsearch\Endpoints\AbstractEndpoint;
use Elasticsearch\Endpoints\Ingest\DeletePipeline;
use Elasticsearch\Endpoints\Ingest\GetPipeline;
use Elasticsearch\Endpoints\Ingest\Pipeline\Delete;
use Elasticsearch\Endpoints\Ingest\Pipeline\Get;
use Elasticsearch\Endpoints\Ingest\Pipeline\Put;
use Elasticsearch\Endpoints\Ingest\PutPipeline;

/**
* Elastica Pipeline object.
Expand All @@ -35,6 +28,7 @@ class Pipeline extends Param

/**
* @var AbstractProcessor[]
*
* @phpstan-var array{processors?: AbstractProcessor[]}
*/
protected $_processors = [];
Expand Down Expand Up @@ -63,12 +57,12 @@ public function create(): Response
throw new InvalidException('You should set a valid processor of type Elastica\Processor\AbstractProcessor.');
}

// TODO: Use only PutPipeline when dropping support for elasticsearch/elasticsearch 7.x
$endpoint = \class_exists(PutPipeline::class) ? new PutPipeline() : new Put();
$endpoint->setId($this->id);
$endpoint->setBody($this->toArray());
$esResponse = $this->getClient()->getConnection()->getClient()->ingest()->putPipeline([
'id' => $this->id,
'body' => $this->toArray(),
]);

return $this->requestEndpoint($endpoint);
return new Response($esResponse->asArray(), $esResponse->getStatusCode());
}

/**
Expand All @@ -78,11 +72,11 @@ public function create(): Response
*/
public function getPipeline(string $id): Response
{
// TODO: Use only GetPipeline when dropping support for elasticsearch/elasticsearch 7.x
$endpoint = \class_exists(GetPipeline::class) ? new GetPipeline() : new Get();
$endpoint->setId($id);
$esResponse = $this->getClient()->getConnection()->getClient()->ingest()->getPipeline([
'id' => $id,
]);

return $this->requestEndpoint($endpoint);
return new Response($esResponse->asArray(), $esResponse->getStatusCode());
}

/**
Expand All @@ -92,11 +86,11 @@ public function getPipeline(string $id): Response
*/
public function deletePipeline(string $id): Response
{
// TODO: Use only DeletePipeline when dropping support for elasticsearch/elasticsearch 7.x
$endpoint = \class_exists(DeletePipeline::class) ? new DeletePipeline() : new Delete();
$endpoint->setId($id);
$esResponse = $this->getClient()->getConnection()->getClient()->ingest()->deletePipeline([
'id' => $id,
]);

return $this->requestEndpoint($endpoint);
return new Response($esResponse->asArray(), $esResponse->getStatusCode());
}

/**
Expand Down Expand Up @@ -171,11 +165,11 @@ public function getClient(): Client

/**
* Makes calls to the elasticsearch server with usage official client Endpoint based on this index.
*
* @deprecated This method is deprecated in Elasticsearch v9
*/
public function requestEndpoint(AbstractEndpoint $endpoint): Response
public function requestEndpoint($endpoint): Response
{
$cloned = clone $endpoint;

return $this->getClient()->requestEndpoint($cloned);
throw new \RuntimeException('requestEndpoint() is deprecated in Elasticsearch v9. AbstractEndpoint class no longer exists. Use direct client methods like $client->ingest()->putPipeline() instead.');
}
}
23 changes: 15 additions & 8 deletions src/Reindex.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Reindex extends Param
public const OPERATION_TYPE_CREATE = 'create';
public const CONFLICTS = 'conflicts';
public const CONFLICTS_PROCEED = 'proceed';
public const SIZE = 'size';
public const SIZE = 'max_docs'; // renamed from 'size' in ES 9.x
public const QUERY = 'query';
public const SORT = 'sort';
public const SCRIPT = 'script';
Expand Down Expand Up @@ -67,12 +67,20 @@ public function run(): Response
{
$body = $this->_getBody($this->_oldIndex, $this->_newIndex, $this->getParams());

$reindexEndpoint = new \Elasticsearch\Endpoints\Reindex();
$params = \array_intersect_key($this->getParams(), \array_fill_keys($reindexEndpoint->getParamWhitelist(), null));
$reindexEndpoint->setParams($params);
$reindexEndpoint->setBody($body);

$this->_lastResponse = $this->_oldIndex->getClient()->requestEndpoint($reindexEndpoint);
$allowedParams = [
self::WAIT_FOR_COMPLETION,
self::WAIT_FOR_ACTIVE_SHARDS,
self::TIMEOUT,
self::SCROLL,
self::REQUESTS_PER_SECOND,
self::REFRESH,
self::SLICES,
];
$params = \array_intersect_key($this->getParams(), \array_fill_keys($allowedParams, null));
$params['body'] = $body;

$esResponse = $this->_oldIndex->getClient()->getConnection()->getClient()->reindex($params);
$this->_lastResponse = new Response($esResponse->asArray(), $esResponse->getStatusCode());

return $this->_lastResponse;
}
Expand Down Expand Up @@ -166,7 +174,6 @@ protected function _getDestPartBody(Index $index, array $params): array
'index' => $index->getName(),
], $this->_resolveDestOptions($params));

// Resolves the pipeline name
$pipeline = $destBody[self::PIPELINE] ?? null;
if ($pipeline instanceof Pipeline) {
$destBody[self::PIPELINE] = $pipeline->getId();
Expand Down
29 changes: 14 additions & 15 deletions src/Snapshot.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Elastica\Exception\NotFoundException;
use Elastica\Exception\ResponseException;
use Elasticsearch\Endpoints\Snapshot\Restore;

/**
* Class Snapshot.
Expand Down Expand Up @@ -47,8 +46,8 @@ public function registerRepository($name, $type, $settings = [])
*
* @param string $name the name of the desired repository
*
* @throws Exception\ResponseException
* @throws Exception\NotFoundException
* @throws ResponseException
* @throws NotFoundException
*
* @return array
*/
Expand Down Expand Up @@ -98,8 +97,8 @@ public function createSnapshot($repository, $name, $options = [], $waitForComple
* @param string $repository the name of the repository from which to retrieve the snapshot
* @param string $name the name of the desired snapshot
*
* @throws Exception\ResponseException
* @throws Exception\NotFoundException
* @throws ResponseException
* @throws NotFoundException
*
* @return array
*/
Expand Down Expand Up @@ -155,16 +154,16 @@ public function deleteSnapshot($repository, $name)
*/
public function restoreSnapshot($repository, $name, $options = [], $waitForCompletion = false)
{
$endpoint = (new Restore())
->setRepository($repository)
->setSnapshot($name)
->setBody($options)
->setParams([
'wait_for_completion' => $waitForCompletion ? 'true' : 'false',
])
;

return $this->_client->requestEndpoint($endpoint);
$params = [
'repository' => $repository,
'snapshot' => $name,
'body' => $options,
'wait_for_completion' => $waitForCompletion ? 'true' : 'false',
];

$esResponse = $this->_client->getConnection()->getClient()->snapshot()->restore($params);

return new Response($esResponse->asArray(), $esResponse->getStatusCode());
}

/**
Expand Down
Loading
Loading