-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds async Elasticsearch support #1309
base: main
Are you sure you want to change the base?
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed just the instrumentation so far but these are the issues I immediately see. I'll take a pass at fixing them and redesigning the instance_info section for async, and we'll see if that fixes the rest of the tests.
|
||
transaction._nr_datastore_instance_info = (None, None, None) | ||
|
||
dt = DatastoreTrace(product="Elasticsearch", target=index, operation=operation, source=async_wrapper) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The source argument here should be source=wrapped
, that's for reporting the code's file name and line number for the Code Level Metrics feature. The async_wrapper
code isn't needed here. The actual wrapper needs to be a coroutine that awaits wrapped()
(probably, will check that's always true).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I fixed it.
def _nr__async_Connection__init__wrapper(wrapped, instance, args, kwargs): | ||
"""Cache datastore instance info on Connection object""" | ||
|
||
def _bind_params(host="localhost", port=9200, *args, **kwargs): | ||
return host, port | ||
|
||
host, port = _bind_params(*args, **kwargs) | ||
port = str(port) | ||
instance._nr_host_port = (host, port) | ||
|
||
return wrapped(*args, **kwargs) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this wrapper is identical to _nr_Connection__init__wrapper
, we can reuse the original instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved!
def async_BaseNode__init__wrapper(wrapped, instance, args, kwargs): | ||
result = wrapped(*args, **kwargs) | ||
instance._nr_host_port = (instance.host, str(instance.port)) | ||
return result | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also identical to the original wrapper, we can reuse it instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved!
except Exception: | ||
instance_info = ("unknown", "unknown", None) | ||
|
||
transaction._nr_datastore_instance_info = instance_info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wrapper is mostly right, but unfortunately it doesn't work in async contexts. When connecting to multiple databases at once there's a race condition that gets introduced if we store instance_info on the transaction object directly. We'll have to instead work out a different design where we stash it elsewhere, or call current_trace()
and carefully insert it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any suggestions on how to do this?
36cc767
to
65d62b6
Compare
Overview
This Pull Request instruments the Elasticsearch python library with asynchronous implementation. Basically, I reused what was already implemented for the synchronous version, however, I ran into some issues with the unit tests in the following files:
If anyone can help me understand what’s going on, I would be grateful.
Related Github Issue
#1204
Testing
The agent includes a suite of tests which should be used to
verify your changes don't break existing functionality. These tests will run with
Github Actions when a pull request is made. More details on running the tests locally can be found
here,
For most contributions it is strongly recommended to add additional tests which
exercise your changes.