Skip to content

Commit ff716e8

Browse files
hasierhaotianw465
authored andcommitted
Stream Django SQL queries and add flag to toggle their streaming (#111)
* Add params and functions to allow recording and streaming SQL queries * Set general default to True and adapt SQLAlchemy to follow the flag * Update docs * Set stream_sql to True by default * Unpatch dbapi2, patch use custom cursor for Django and chunked_cursor
1 parent edefb3d commit ff716e8

File tree

10 files changed

+180
-15
lines changed

10 files changed

+180
-15
lines changed

CHANGELOG.rst

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44

55
unreleased
66
==========
7+
* feature: Stream Django ORM SQL queries and add flag to toggle their streaming
78
* feature: Recursively patch any given module functions with capture
89

910
2.2.0

README.md

+17
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,19 @@ with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
251251
pass
252252
```
253253

254+
### Trace SQL queries
255+
By default, if no other value is provided to `.configure()`, SQL trace streaming is enabled
256+
for all the supported DB engines. Those currently are:
257+
- Any engine attached to the Django ORM.
258+
- Any engine attached to SQLAlchemy.
259+
260+
The behaviour can be toggled by sending the appropriate `stream_sql` value, for example:
261+
```python
262+
from aws_xray_sdk.core import xray_recorder
263+
264+
xray_recorder.configure(service='fallback_name', stream_sql=True)
265+
```
266+
254267
### Patch third-party libraries
255268

256269
```python
@@ -297,6 +310,10 @@ MIDDLEWARE = [
297310
# ... other middlewares
298311
]
299312
```
313+
#### SQL tracing
314+
If Django's ORM is patched - either using the `AUTO_INSTRUMENT = True` in your settings file
315+
or explicitly calling `patch_db()` - the SQL query trace streaming can then be enabled or
316+
disabled updating the `STREAM_SQL` variable in your settings file. It is enabled by default.
300317

301318
#### Automatic patching
302319
The automatic module patching can also be configured through Django settings.

aws_xray_sdk/core/recorder.py

+14-1
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def __init__(self):
7272
self._dynamic_naming = None
7373
self._aws_metadata = copy.deepcopy(XRAY_META)
7474
self._origin = None
75+
self._stream_sql = True
7576

7677
if type(self.sampler).__name__ == 'DefaultSampler':
7778
self.sampler.load_settings(DaemonConfig(), self.context)
@@ -81,7 +82,8 @@ def configure(self, sampling=None, plugins=None,
8182
daemon_address=None, service=None,
8283
context=None, emitter=None, streaming=None,
8384
dynamic_naming=None, streaming_threshold=None,
84-
max_trace_back=None, sampler=None):
85+
max_trace_back=None, sampler=None,
86+
stream_sql=True):
8587
"""Configure global X-Ray recorder.
8688
8789
Configure needs to run before patching thrid party libraries
@@ -130,6 +132,7 @@ class to have your own implementation of the streaming process.
130132
maximum number of subsegments within a segment.
131133
:param int max_trace_back: The maxinum number of stack traces recorded
132134
by auto-capture. Lower this if a single document becomes too large.
135+
:param bool stream_sql: Whether SQL query texts should be streamed.
133136
134137
Environment variables AWS_XRAY_DAEMON_ADDRESS, AWS_XRAY_CONTEXT_MISSING
135138
and AWS_XRAY_TRACING_NAME respectively overrides arguments
@@ -159,6 +162,8 @@ class to have your own implementation of the streaming process.
159162
self.streaming_threshold = streaming_threshold
160163
if max_trace_back:
161164
self.max_trace_back = max_trace_back
165+
if stream_sql is not None:
166+
self.stream_sql = stream_sql
162167

163168
if plugins:
164169
plugin_modules = get_plugin_modules(plugins)
@@ -548,3 +553,11 @@ def max_trace_back(self):
548553
@max_trace_back.setter
549554
def max_trace_back(self, value):
550555
self._max_trace_back = value
556+
557+
@property
558+
def stream_sql(self):
559+
return self._stream_sql
560+
561+
@stream_sql.setter
562+
def stream_sql(self, value):
563+
self._stream_sql = value

aws_xray_sdk/ext/django/apps.py

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def ready(self):
3636
dynamic_naming=settings.DYNAMIC_NAMING,
3737
streaming_threshold=settings.STREAMING_THRESHOLD,
3838
max_trace_back=settings.MAX_TRACE_BACK,
39+
stream_sql=settings.STREAM_SQL,
3940
)
4041

4142
if settings.PATCH_MODULES:

aws_xray_sdk/ext/django/conf.py

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
'DYNAMIC_NAMING': None,
1515
'STREAMING_THRESHOLD': None,
1616
'MAX_TRACE_BACK': None,
17+
'STREAM_SQL': True,
1718
'PATCH_MODULES': [],
1819
'AUTO_PATCH_PARENT_SEGMENT_NAME': None,
1920
'IGNORE_MODULE_PATTERNS': [],

aws_xray_sdk/ext/django/db.py

+47-9
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,62 @@
1+
import copy
12
import logging
23
import importlib
34

45
from django.db import connections
56

7+
from aws_xray_sdk.core import xray_recorder
68
from aws_xray_sdk.ext.dbapi2 import XRayTracedCursor
79

810
log = logging.getLogger(__name__)
911

1012

1113
def patch_db():
12-
1314
for conn in connections.all():
1415
module = importlib.import_module(conn.__module__)
1516
_patch_conn(getattr(module, conn.__class__.__name__))
1617

1718

18-
def _patch_conn(conn):
19-
20-
attr = '_xray_original_cursor'
19+
class DjangoXRayTracedCursor(XRayTracedCursor):
20+
def execute(self, query, *args, **kwargs):
21+
if xray_recorder.stream_sql:
22+
_previous_meta = copy.copy(self._xray_meta)
23+
self._xray_meta['sanitized_query'] = query
24+
result = super(DjangoXRayTracedCursor, self).execute(query, *args, **kwargs)
25+
if xray_recorder.stream_sql:
26+
self._xray_meta = _previous_meta
27+
return result
28+
29+
def executemany(self, query, *args, **kwargs):
30+
if xray_recorder.stream_sql:
31+
_previous_meta = copy.copy(self._xray_meta)
32+
self._xray_meta['sanitized_query'] = query
33+
result = super(DjangoXRayTracedCursor, self).executemany(query, *args, **kwargs)
34+
if xray_recorder.stream_sql:
35+
self._xray_meta = _previous_meta
36+
return result
37+
38+
def callproc(self, proc, args):
39+
if xray_recorder.stream_sql:
40+
_previous_meta = copy.copy(self._xray_meta)
41+
self._xray_meta['sanitized_query'] = proc
42+
result = super(DjangoXRayTracedCursor, self).callproc(proc, args)
43+
if xray_recorder.stream_sql:
44+
self._xray_meta = _previous_meta
45+
return result
46+
47+
48+
def _patch_cursor(cursor_name, conn):
49+
attr = '_xray_original_{}'.format(cursor_name)
2150

2251
if hasattr(conn, attr):
23-
log.debug('django built-in db already patched')
52+
log.debug('django built-in db {} already patched'.format(cursor_name))
53+
return
54+
55+
if not hasattr(conn, cursor_name):
56+
log.debug('django built-in db does not have {}'.format(cursor_name))
2457
return
2558

26-
setattr(conn, attr, conn.cursor)
59+
setattr(conn, attr, getattr(conn, cursor_name))
2760

2861
meta = {}
2962

@@ -45,7 +78,12 @@ def cursor(self, *args, **kwargs):
4578
if user:
4679
meta['user'] = user
4780

48-
return XRayTracedCursor(
49-
self._xray_original_cursor(*args, **kwargs), meta)
81+
original_cursor = getattr(self, attr)(*args, **kwargs)
82+
return DjangoXRayTracedCursor(original_cursor, meta)
83+
84+
setattr(conn, cursor_name, cursor)
5085

51-
conn.cursor = cursor
86+
87+
def _patch_conn(conn):
88+
_patch_cursor('cursor', conn)
89+
_patch_cursor('chunked_cursor', conn)

aws_xray_sdk/ext/sqlalchemy/util/decorators.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ def wrapper(*args, **kw):
4747
if isinstance(arg, XRayQuery):
4848
try:
4949
sql = parse_bind(arg.session.bind)
50-
sql['sanitized_query'] = str(arg)
50+
if xray_recorder.stream_sql:
51+
sql['sanitized_query'] = str(arg)
5152
except Exception:
5253
sql = None
5354
if sql is not None:

tests/ext/django/test_db.py

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import django
2+
3+
import pytest
4+
5+
from aws_xray_sdk.core import xray_recorder
6+
from aws_xray_sdk.core.context import Context
7+
from aws_xray_sdk.ext.django.db import patch_db
8+
9+
10+
@pytest.fixture(scope='module', autouse=True)
11+
def setup():
12+
django.setup()
13+
xray_recorder.configure(context=Context(),
14+
context_missing='LOG_ERROR')
15+
patch_db()
16+
17+
18+
@pytest.fixture(scope='module')
19+
def user_class(setup):
20+
from django.db import models
21+
from django_fake_model import models as f
22+
23+
class User(f.FakeModel):
24+
name = models.CharField(max_length=255)
25+
password = models.CharField(max_length=255)
26+
27+
return User
28+
29+
30+
@pytest.fixture(
31+
autouse=True,
32+
params=[
33+
False,
34+
True,
35+
]
36+
)
37+
@pytest.mark.django_db
38+
def func_setup(request, user_class):
39+
xray_recorder.stream_sql = request.param
40+
xray_recorder.clear_trace_entities()
41+
xray_recorder.begin_segment('name')
42+
try:
43+
user_class.create_table()
44+
yield
45+
finally:
46+
xray_recorder.clear_trace_entities()
47+
try:
48+
user_class.delete_table()
49+
finally:
50+
xray_recorder.end_segment()
51+
52+
53+
def _assert_query(sql_meta):
54+
if xray_recorder.stream_sql:
55+
assert 'sanitized_query' in sql_meta
56+
assert sql_meta['sanitized_query']
57+
assert sql_meta['sanitized_query'].startswith('SELECT')
58+
else:
59+
if 'sanitized_query' in sql_meta:
60+
assert sql_meta['sanitized_query']
61+
# Django internally executes queries for table checks, ignore those
62+
assert not sql_meta['sanitized_query'].startswith('SELECT')
63+
64+
65+
def test_all(user_class):
66+
""" Test calling all() on get all records.
67+
Verify we run the query and return the SQL as metadata"""
68+
# Materialising the query executes the SQL
69+
list(user_class.objects.all())
70+
subsegment = xray_recorder.current_segment().subsegments[-1]
71+
sql = subsegment.sql
72+
assert sql['database_type'] == 'sqlite'
73+
_assert_query(sql)
74+
75+
76+
def test_filter(user_class):
77+
""" Test calling filter() to get filtered records.
78+
Verify we run the query and return the SQL as metadata"""
79+
# Materialising the query executes the SQL
80+
list(user_class.objects.filter(password='mypassword!').all())
81+
subsegment = xray_recorder.current_segment().subsegments[-1]
82+
sql = subsegment.sql
83+
assert sql['database_type'] == 'sqlite'
84+
_assert_query(sql)
85+
if xray_recorder.stream_sql:
86+
assert 'mypassword!' not in sql['sanitized_query']
87+
assert '"password" = %s' in sql['sanitized_query']

tests/ext/flask_sqlalchemy/test_query.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,15 @@ class User(db.Model):
2222
password = db.Column(db.String(255), nullable=False)
2323

2424

25-
@pytest.fixture()
26-
def session():
25+
@pytest.fixture(
26+
params=[
27+
False,
28+
True,
29+
],
30+
)
31+
def session(request):
2732
"""Test Fixture to Create DataBase Tables and start a trace segment"""
28-
xray_recorder.configure(service='test', sampling=False, context=Context())
33+
xray_recorder.configure(service='test', sampling=False, context=Context(), stream_sql=request.param)
2934
xray_recorder.clear_trace_entities()
3035
xray_recorder.begin_segment('SQLAlchemyTest')
3136
db.create_all()
@@ -41,8 +46,8 @@ def test_all(capsys, session):
4146
User.query.all()
4247
subsegment = find_subsegment_by_annotation(xray_recorder.current_segment(), 'sqlalchemy', 'sqlalchemy.orm.query.all')
4348
assert subsegment['annotations']['sqlalchemy'] == 'sqlalchemy.orm.query.all'
44-
assert subsegment['sql']['sanitized_query']
4549
assert subsegment['sql']['url']
50+
assert bool(subsegment['sql'].get('sanitized_query', None)) is xray_recorder.stream_sql
4651

4752

4853
def test_add(capsys, session):

tox.ini

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ deps =
1818
future
1919
# the sdk doesn't support earlier version of django
2020
django >= 1.10, <2.0
21+
django-fake-model
2122
pynamodb >= 3.3.1
2223
psycopg2
2324
pg8000

0 commit comments

Comments
 (0)