Skip to content

Commit 36cc767

Browse files
committed
Adds async Elasticsearch support
1 parent ee128f8 commit 36cc767

12 files changed

+1558
-1000
lines changed

newrelic/config.py

+364-993
Large diffs are not rendered by default.

newrelic/hooks/datastore_elasticsearch.py

+254-7
Large diffs are not rendered by default.

tests/datastore_elasticsearch/conftest.py

+7
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,10 @@ def client():
4747
from elasticsearch import Elasticsearch
4848

4949
return Elasticsearch(ES_URL)
50+
51+
52+
@pytest.fixture(scope="session")
53+
def async_client():
54+
from elasticsearch import AsyncElasticsearch
55+
56+
return AsyncElasticsearch(ES_URL)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pytest
16+
17+
try:
18+
from elasticsearch._async.http_aiohttp import AsyncConnection
19+
except ImportError:
20+
from elastic_transport._models import NodeConfig
21+
from elastic_transport._node._base_async import BaseAsyncNode as AsyncConnection
22+
23+
from conftest import ES_VERSION, ES_SETTINGS
24+
25+
26+
HOST = {"scheme": "http", "host": ES_SETTINGS["host"], "port": int(ES_SETTINGS["port"])}
27+
28+
IS_V8 = ES_VERSION >= (8,)
29+
SKIP_IF_V7 = pytest.mark.skipif(not IS_V8, reason="Skipping v8 tests.")
30+
SKIP_IF_V8 = pytest.mark.skipif(IS_V8, reason="Skipping v7 tests.")
31+
32+
33+
def test_connection_default():
34+
if IS_V8:
35+
conn = AsyncConnection(NodeConfig(**HOST))
36+
else:
37+
conn = AsyncConnection(**HOST)
38+
39+
assert conn._nr_host_port == (ES_SETTINGS["host"], ES_SETTINGS["port"])
40+
41+
42+
@SKIP_IF_V7
43+
def test_connection_config():
44+
conn = AsyncConnection(NodeConfig(scheme="http", host="foo", port=8888))
45+
assert conn._nr_host_port == ("foo", "8888")
46+
47+
48+
@SKIP_IF_V8
49+
def test_connection_host_arg():
50+
conn = AsyncConnection("the_host")
51+
assert conn._nr_host_port == ("the_host", "9200")
52+
53+
54+
@SKIP_IF_V8
55+
def test_connection_args():
56+
conn = AsyncConnection("the_host", 9999)
57+
assert conn._nr_host_port == ("the_host", "9999")
58+
59+
60+
@SKIP_IF_V8
61+
def test_connection_kwargs():
62+
conn = AsyncConnection(host="foo", port=8888)
63+
assert conn._nr_host_port == ("foo", "8888")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import sqlite3
16+
import pytest
17+
from testing_support.validators.validate_database_duration import validate_database_duration
18+
19+
from newrelic.api.background_task import background_task
20+
21+
from conftest import ES_VERSION
22+
23+
24+
async def _exercise_es_v7(es):
25+
await es.index(
26+
index="contacts", doc_type="person", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1
27+
)
28+
await es.index(
29+
index="contacts", doc_type="person", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2
30+
)
31+
await es.index(
32+
index="contacts", doc_type="person", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3
33+
)
34+
await es.indices.refresh("contacts")
35+
36+
37+
async def _exercise_es_v8(es):
38+
await es.index(index="contacts", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1)
39+
await es.index(index="contacts", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2)
40+
await es.index(index="contacts", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3)
41+
await es.indices.refresh(index="contacts")
42+
43+
44+
_exercise_es = _exercise_es_v7 if ES_VERSION < (8, 0, 0) else _exercise_es_v8
45+
46+
47+
@pytest.mark.asyncio
48+
@validate_database_duration()
49+
@background_task()
50+
async def test_elasticsearch_database_duration(async_client):
51+
await _exercise_es(async_client)
52+
53+
54+
@pytest.mark.asyncio
55+
@validate_database_duration()
56+
@background_task()
57+
async def test_elasticsearch_and_sqlite_database_duration(async_client):
58+
# Make Elasticsearch queries
59+
60+
await _exercise_es(async_client)
61+
62+
# Make sqlite queries
63+
64+
conn = sqlite3.connect(":memory:")
65+
cur = conn.cursor()
66+
67+
cur.execute("CREATE TABLE people (name text, age int)")
68+
cur.execute("INSERT INTO people VALUES ('Bob', 22)")
69+
70+
conn.commit()
71+
conn.close()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import pytest
15+
import elasticsearch._async.client as client
16+
from testing_support.fixtures import override_application_settings
17+
from testing_support.fixture.event_loop import event_loop as loop # noqa: F401
18+
from testing_support.util import instance_hostname
19+
from testing_support.validators.validate_transaction_errors import validate_transaction_errors
20+
from testing_support.validators.validate_transaction_metrics import validate_transaction_metrics
21+
22+
from newrelic.api.background_task import background_task
23+
24+
from conftest import ES_VERSION, ES_SETTINGS
25+
26+
27+
# Settings
28+
29+
_enable_instance_settings = {"datastore_tracer.instance_reporting.enabled": True}
30+
_disable_instance_settings = {"datastore_tracer.instance_reporting.enabled": False}
31+
32+
# Metrics
33+
34+
_base_scoped_metrics = [
35+
("Datastore/statement/Elasticsearch/_all/cluster.health", 1),
36+
("Datastore/statement/Elasticsearch/_all/search", 2),
37+
("Datastore/statement/Elasticsearch/address/index", 2),
38+
("Datastore/statement/Elasticsearch/address/search", 1),
39+
("Datastore/statement/Elasticsearch/contacts/index", 3),
40+
("Datastore/statement/Elasticsearch/contacts/indices.refresh", 1),
41+
("Datastore/statement/Elasticsearch/contacts/search", 2),
42+
("Datastore/statement/Elasticsearch/other/search", 2),
43+
]
44+
45+
_base_rollup_metrics = [
46+
("Datastore/operation/Elasticsearch/cluster.health", 1),
47+
("Datastore/operation/Elasticsearch/index", 5),
48+
("Datastore/operation/Elasticsearch/indices.refresh", 1),
49+
("Datastore/operation/Elasticsearch/search", 7),
50+
("Datastore/statement/Elasticsearch/_all/cluster.health", 1),
51+
("Datastore/statement/Elasticsearch/_all/search", 2),
52+
("Datastore/statement/Elasticsearch/address/index", 2),
53+
("Datastore/statement/Elasticsearch/address/search", 1),
54+
("Datastore/statement/Elasticsearch/contacts/index", 3),
55+
("Datastore/statement/Elasticsearch/contacts/indices.refresh", 1),
56+
("Datastore/statement/Elasticsearch/contacts/search", 2),
57+
("Datastore/statement/Elasticsearch/other/search", 2),
58+
]
59+
60+
# Version support
61+
62+
63+
def is_importable(module_path):
64+
try:
65+
__import__(module_path)
66+
return True
67+
except ImportError:
68+
return False
69+
70+
71+
_all_count = 17
72+
73+
if is_importable("elasticsearch._async.client.cat") or is_importable("elasticsearch._async.client.cat"):
74+
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/cat.health", 1))
75+
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/cat.health", 1))
76+
_all_count += 1
77+
else:
78+
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/cat.health", None))
79+
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/cat.health", None))
80+
81+
if is_importable("elasticsearch._async.client.nodes") or is_importable("elasticsearch._async.client.nodes"):
82+
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", 1))
83+
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", 1))
84+
_all_count += 1
85+
else:
86+
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", None))
87+
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/nodes.info", None))
88+
89+
if hasattr(client, "SnapshotClient") and hasattr(client.SnapshotClient, "status"):
90+
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", 1))
91+
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", 1))
92+
_all_count += 1
93+
else:
94+
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", None))
95+
_base_rollup_metrics.append(("Datastore/operation/Elasticsearch/snapshot.status", None))
96+
97+
if hasattr(client.IndicesClient, "status"):
98+
_base_scoped_metrics.append(("Datastore/statement/Elasticsearch/_all/indices.status", 1))
99+
_base_rollup_metrics.extend(
100+
[
101+
("Datastore/operation/Elasticsearch/indices.status", 1),
102+
("Datastore/statement/Elasticsearch/_all/indices.status", 1),
103+
]
104+
)
105+
_all_count += 1
106+
else:
107+
_base_scoped_metrics.append(("Datastore/operation/Elasticsearch/indices.status", None))
108+
_base_rollup_metrics.extend(
109+
[
110+
("Datastore/operation/Elasticsearch/indices.status", None),
111+
("Datastore/statement/Elasticsearch/_all/indices.status", None),
112+
]
113+
)
114+
115+
_base_rollup_metrics.extend(
116+
[
117+
("Datastore/all", _all_count),
118+
("Datastore/allOther", _all_count),
119+
("Datastore/Elasticsearch/all", _all_count),
120+
("Datastore/Elasticsearch/allOther", _all_count),
121+
]
122+
)
123+
124+
# Instance info
125+
126+
_disable_scoped_metrics = list(_base_scoped_metrics)
127+
_disable_rollup_metrics = list(_base_rollup_metrics)
128+
129+
_enable_scoped_metrics = list(_base_scoped_metrics)
130+
_enable_rollup_metrics = list(_base_rollup_metrics)
131+
132+
_host = instance_hostname(ES_SETTINGS["host"])
133+
_port = ES_SETTINGS["port"]
134+
135+
_instance_metric_name = f"Datastore/instance/Elasticsearch/{_host}/{_port}"
136+
137+
_enable_rollup_metrics.append((_instance_metric_name, _all_count))
138+
139+
_disable_rollup_metrics.append((_instance_metric_name, None))
140+
141+
# Query
142+
143+
144+
async def _exercise_es_v7(es):
145+
await es.index(
146+
index="contacts", doc_type="person", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1
147+
)
148+
await es.index(
149+
index="contacts", doc_type="person", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2
150+
)
151+
await es.index(
152+
index="contacts", doc_type="person", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3
153+
)
154+
await es.indices.refresh("contacts")
155+
await es.index(
156+
index="address", doc_type="employee", body={"name": "Sherlock", "address": "221B Baker Street, London"}, id=1
157+
)
158+
await es.index(
159+
index="address",
160+
doc_type="employee",
161+
body={"name": "Bilbo", "address": "Bag End, Bagshot row, Hobbiton, Shire"},
162+
id=2,
163+
)
164+
await es.search(index="contacts", q="name:Joe")
165+
await es.search(index="contacts", q="name:jessica")
166+
await es.search(index="address", q="name:Sherlock")
167+
await es.search(index=["contacts", "address"], q="name:Bilbo")
168+
await es.search(index="contacts,address", q="name:Bilbo")
169+
await es.search(index="*", q="name:Bilbo")
170+
await es.search(q="name:Bilbo")
171+
await es.cluster.health()
172+
173+
if hasattr(es, "cat"):
174+
await es.cat.health()
175+
if hasattr(es, "nodes"):
176+
await es.nodes.info()
177+
if hasattr(es, "snapshot") and hasattr(es.snapshot, "status"):
178+
await es.snapshot.status()
179+
if hasattr(es.indices, "status"):
180+
await es.indices.status()
181+
182+
183+
async def _exercise_es_v8(es):
184+
await es.index(index="contacts", body={"name": "Joe Tester", "age": 25, "title": "QA Engineer"}, id=1)
185+
await es.index(index="contacts", body={"name": "Jessica Coder", "age": 32, "title": "Programmer"}, id=2)
186+
await es.index(index="contacts", body={"name": "Freddy Tester", "age": 29, "title": "Assistant"}, id=3)
187+
await es.indices.refresh(index="contacts")
188+
await es.index(index="address", body={"name": "Sherlock", "address": "221B Baker Street, London"}, id=1)
189+
await es.index(index="address", body={"name": "Bilbo", "address": "Bag End, Bagshot row, Hobbiton, Shire"}, id=2)
190+
await es.search(index="contacts", q="name:Joe")
191+
await es.search(index="contacts", q="name:jessica")
192+
await es.search(index="address", q="name:Sherlock")
193+
await es.search(index=["contacts", "address"], q="name:Bilbo")
194+
await es.search(index="contacts,address", q="name:Bilbo")
195+
await es.search(index="*", q="name:Bilbo")
196+
await es.search(q="name:Bilbo")
197+
await es.cluster.health()
198+
199+
if hasattr(es, "cat"):
200+
await es.cat.health()
201+
if hasattr(es, "nodes"):
202+
await es.nodes.info()
203+
if hasattr(es, "snapshot") and hasattr(es.snapshot, "status"):
204+
await es.snapshot.status()
205+
if hasattr(es.indices, "status"):
206+
await es.indices.status()
207+
208+
209+
_exercise_es = _exercise_es_v7 if ES_VERSION < (8, 0, 0) else _exercise_es_v8
210+
211+
212+
# Test
213+
214+
215+
@validate_transaction_errors(errors=[])
216+
@validate_transaction_metrics(
217+
"test_async_elasticsearch:test_async_elasticsearch_operation_disabled",
218+
scoped_metrics=_disable_scoped_metrics,
219+
rollup_metrics=_disable_rollup_metrics,
220+
background_task=True,
221+
)
222+
@override_application_settings(_disable_instance_settings)
223+
@background_task()
224+
def test_async_elasticsearch_operation_disabled(async_client, loop):
225+
loop.run_until_complete(_exercise_es(async_client))
226+
227+
228+
@validate_transaction_errors(errors=[])
229+
@validate_transaction_metrics(
230+
"test_async_elasticsearch:test_async_elasticsearch_operation_enabled",
231+
scoped_metrics=_enable_scoped_metrics,
232+
rollup_metrics=_enable_rollup_metrics,
233+
background_task=True,
234+
)
235+
@override_application_settings(_enable_instance_settings)
236+
@background_task()
237+
def test_async_elasticsearch_operation_enabled(async_client, loop):
238+
loop.run_until_complete(_exercise_es(async_client))

0 commit comments

Comments
 (0)