Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds async Elasticsearch support #1309

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,357 changes: 364 additions & 993 deletions newrelic/config.py

Large diffs are not rendered by default.

242 changes: 235 additions & 7 deletions newrelic/hooks/datastore_elasticsearch.py

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions tests/datastore_elasticsearch/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,10 @@ def client():
from elasticsearch import Elasticsearch

return Elasticsearch(ES_URL)


@pytest.fixture(scope="session")
def async_client():
from elasticsearch import AsyncElasticsearch

return AsyncElasticsearch(ES_URL)
63 changes: 63 additions & 0 deletions tests/datastore_elasticsearch/test_async_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

try:
from elasticsearch._async.http_aiohttp import AsyncConnection
except ImportError:
from elastic_transport._models import NodeConfig
from elastic_transport._node._base_async import BaseAsyncNode as AsyncConnection

from conftest import ES_VERSION, ES_SETTINGS


HOST = {"scheme": "http", "host": ES_SETTINGS["host"], "port": int(ES_SETTINGS["port"])}

IS_V8 = ES_VERSION >= (8,)
SKIP_IF_V7 = pytest.mark.skipif(not IS_V8, reason="Skipping v8 tests.")
SKIP_IF_V8 = pytest.mark.skipif(IS_V8, reason="Skipping v7 tests.")


def test_connection_default():
if IS_V8:
conn = AsyncConnection(NodeConfig(**HOST))
else:
conn = AsyncConnection(**HOST)

assert conn._nr_host_port == (ES_SETTINGS["host"], ES_SETTINGS["port"])


@SKIP_IF_V7
def test_connection_config():
conn = AsyncConnection(NodeConfig(scheme="http", host="foo", port=8888))
assert conn._nr_host_port == ("foo", "8888")


@SKIP_IF_V8
def test_connection_host_arg():
conn = AsyncConnection("the_host")
assert conn._nr_host_port == ("the_host", "9200")


@SKIP_IF_V8
def test_connection_args():
conn = AsyncConnection("the_host", 9999)
assert conn._nr_host_port == ("the_host", "9999")


@SKIP_IF_V8
def test_connection_kwargs():
conn = AsyncConnection(host="foo", port=8888)
assert conn._nr_host_port == ("foo", "8888")
71 changes: 71 additions & 0 deletions tests/datastore_elasticsearch/test_async_database_duration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sqlite3
import pytest
from testing_support.validators.validate_database_duration import validate_database_duration

from newrelic.api.background_task import background_task

from conftest import ES_VERSION


async def _exercise_es_v7(es):
await es.index(
index="contacts", doc_type="person", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1
)
await es.index(
index="contacts", doc_type="person", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2
)
await es.index(
index="contacts", doc_type="person", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3
)
await es.indices.refresh("contacts")


async def _exercise_es_v8(es):
await es.index(index="contacts", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1)
await es.index(index="contacts", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2)
await es.index(index="contacts", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3)
await es.indices.refresh(index="contacts")


_exercise_es = _exercise_es_v7 if ES_VERSION < (8, 0, 0) else _exercise_es_v8


@pytest.mark.asyncio
@validate_database_duration()
@background_task()
async def test_elasticsearch_database_duration(async_client):
await _exercise_es(async_client)


@pytest.mark.asyncio
@validate_database_duration()
@background_task()
async def test_elasticsearch_and_sqlite_database_duration(async_client):
# Make Elasticsearch queries

await _exercise_es(async_client)

# Make sqlite queries

conn = sqlite3.connect(":memory:")
cur = conn.cursor()

cur.execute("CREATE TABLE people (name text, age int)")
cur.execute("INSERT INTO people VALUES ('Bob', 22)")

conn.commit()
conn.close()
238 changes: 238 additions & 0 deletions tests/datastore_elasticsearch/test_async_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
# Copyright 2010 New Relic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import elasticsearch._async.client as client
from testing_support.fixtures import override_application_settings
from testing_support.fixture.event_loop import event_loop as loop # noqa: F401
from testing_support.util import instance_hostname
from testing_support.validators.validate_transaction_errors import validate_transaction_errors
from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics

from newrelic.api.background_task import background_task

from conftest import ES_VERSION, ES_SETTINGS


# Settings

_enable_instance_settings = {"datastore_tracer.instance_reporting.enabled": True}
_disable_instance_settings = {"datastore_tracer.instance_reporting.enabled": False}

# Metrics

_base_scoped_metrics = [
("Datastore/statement/Elasticsearch/_all/cluster.health", 1),
("Datastore/statement/Elasticsearch/_all/search", 2),
("Datastore/statement/Elasticsearch/address/index", 2),
("Datastore/statement/Elasticsearch/address/search", 1),
("Datastore/statement/Elasticsearch/contacts/index", 3),
("Datastore/statement/Elasticsearch/contacts/indices.refresh", 1),
("Datastore/statement/Elasticsearch/contacts/search", 2),
("Datastore/statement/Elasticsearch/other/search", 2),
]

_base_rollup_metrics = [
("Datastore/operation/Elasticsearch/cluster.health", 1),
("Datastore/operation/Elasticsearch/index", 5),
("Datastore/operation/Elasticsearch/indices.refresh", 1),
("Datastore/operation/Elasticsearch/search", 7),
("Datastore/statement/Elasticsearch/_all/cluster.health", 1),
("Datastore/statement/Elasticsearch/_all/search", 2),
("Datastore/statement/Elasticsearch/address/index", 2),
("Datastore/statement/Elasticsearch/address/search", 1),
("Datastore/statement/Elasticsearch/contacts/index", 3),
("Datastore/statement/Elasticsearch/contacts/indices.refresh", 1),
("Datastore/statement/Elasticsearch/contacts/search", 2),
("Datastore/statement/Elasticsearch/other/search", 2),
]

# Version support


def is_importable(module_path):
try:
__import__(module_path)
return True
except ImportError:
return False


_all_count = 17

if is_importable("elasticsearch._async.client.cat") or is_importable("elasticsearch._async.client.cat"):
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/cat.health", 1))
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/cat.health", 1))
_all_count += 1
else:
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/cat.health", None))
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/cat.health", None))

if is_importable("elasticsearch._async.client.nodes") or is_importable("elasticsearch._async.client.nodes"):
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", 1))
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", 1))
_all_count += 1
else:
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", None))
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", None))

if hasattr(client, "SnapshotClient") and hasattr(client.SnapshotClient, "status"):
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", 1))
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", 1))
_all_count += 1
else:
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", None))
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", None))

if hasattr(client.IndicesClient, "status"):
_base_scoped_metrics.append(("Datastore/statement/Elasticsearch/_all/indices.status", 1))
_base_rollup_metrics.extend(
[
("Datastore/operation/Elasticsearch/indices.status", 1),
("Datastore/statement/Elasticsearch/_all/indices.status", 1),
]
)
_all_count += 1
else:
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/indices.status", None))
_base_rollup_metrics.extend(
[
("Datastore/operation/Elasticsearch/indices.status", None),
("Datastore/statement/Elasticsearch/_all/indices.status", None),
]
)

_base_rollup_metrics.extend(
[
("Datastore/all", _all_count),
("Datastore/allOther", _all_count),
("Datastore/Elasticsearch/all", _all_count),
("Datastore/Elasticsearch/allOther", _all_count),
]
)

# Instance info

_disable_scoped_metrics = list(_base_scoped_metrics)
_disable_rollup_metrics = list(_base_rollup_metrics)

_enable_scoped_metrics = list(_base_scoped_metrics)
_enable_rollup_metrics = list(_base_rollup_metrics)

_host = instance_hostname(ES_SETTINGS["host"])
_port = ES_SETTINGS["port"]

_instance_metric_name = f"Datastore/instance/Elasticsearch/{_host}/{_port}"

_enable_rollup_metrics.append((_instance_metric_name, _all_count))

_disable_rollup_metrics.append((_instance_metric_name, None))

# Query


async def _exercise_es_v7(es):
await es.index(
index="contacts", doc_type="person", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1
)
await es.index(
index="contacts", doc_type="person", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2
)
await es.index(
index="contacts", doc_type="person", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3
)
await es.indices.refresh("contacts")
await es.index(
index="address", doc_type="employee", body={"name": "Sherlock", "address": "221B Baker Street, London"}, id=1
)
await es.index(
index="address",
doc_type="employee",
body={"name": "Bilbo", "address": "Bag End, Bagshot row, Hobbiton, Shire"},
id=2,
)
await es.search(index="contacts", q="name:Joe")
await es.search(index="contacts", q="name:jessica")
await es.search(index="address", q="name:Sherlock")
await es.search(index=["contacts", "address"], q="name:Bilbo")
await es.search(index="contacts,address", q="name:Bilbo")
await es.search(index="*", q="name:Bilbo")
await es.search(q="name:Bilbo")
await es.cluster.health()

if hasattr(es, "cat"):
await es.cat.health()
if hasattr(es, "nodes"):
await es.nodes.info()
if hasattr(es, "snapshot") and hasattr(es.snapshot, "status"):
await es.snapshot.status()
if hasattr(es.indices, "status"):
await es.indices.status()


async def _exercise_es_v8(es):
await es.index(index="contacts", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1)
await es.index(index="contacts", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2)
await es.index(index="contacts", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3)
await es.indices.refresh(index="contacts")
await es.index(index="address", body={"name": "Sherlock", "address": "221B Baker Street, London"}, id=1)
await es.index(index="address", body={"name": "Bilbo", "address": "Bag End, Bagshot row, Hobbiton, Shire"}, id=2)
await es.search(index="contacts", q="name:Joe")
await es.search(index="contacts", q="name:jessica")
await es.search(index="address", q="name:Sherlock")
await es.search(index=["contacts", "address"], q="name:Bilbo")
await es.search(index="contacts,address", q="name:Bilbo")
await es.search(index="*", q="name:Bilbo")
await es.search(q="name:Bilbo")
await es.cluster.health()

if hasattr(es, "cat"):
await es.cat.health()
if hasattr(es, "nodes"):
await es.nodes.info()
if hasattr(es, "snapshot") and hasattr(es.snapshot, "status"):
await es.snapshot.status()
if hasattr(es.indices, "status"):
await es.indices.status()


_exercise_es = _exercise_es_v7 if ES_VERSION < (8, 0, 0) else _exercise_es_v8


# Test


@validate_transaction_errors(errors=[])
@validate_transaction_metrics(
"test_async_elasticsearch:test_async_elasticsearch_operation_disabled",
scoped_metrics=_disable_scoped_metrics,
rollup_metrics=_disable_rollup_metrics,
background_task=True,
)
@override_application_settings(_disable_instance_settings)
@background_task()
def test_async_elasticsearch_operation_disabled(async_client, loop):
loop.run_until_complete(_exercise_es(async_client))


@validate_transaction_errors(errors=[])
@validate_transaction_metrics(
"test_async_elasticsearch:test_async_elasticsearch_operation_enabled",
scoped_metrics=_enable_scoped_metrics,
rollup_metrics=_enable_rollup_metrics,
background_task=True,
)
@override_application_settings(_enable_instance_settings)
@background_task()
def test_async_elasticsearch_operation_enabled(async_client, loop):
loop.run_until_complete(_exercise_es(async_client))
Loading