Skip to content

Commit 3e5be52

Browse files
Fix Celery Instrumentation for Workers (#1133)
* Add new instrumentation for celery * Fix wrapper detection * Move instrumentation testing * Add metaprogramming instrumentation * Remove build_tracer instrumentation again * Fix code level metrics for celery * Cleanup * Add code level metrics tests to all task methods * Wire in CLM source to task name lookup * Fix bug in validate_transaction_count * Module scoped fixture * Fix python 2 syntax --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
1 parent df2b159 commit 3e5be52

File tree

5 files changed

+133
-26
lines changed

5 files changed

+133
-26
lines changed

newrelic/config.py

+5
Original file line numberDiff line numberDiff line change
@@ -4374,6 +4374,11 @@ def _process_module_builtin_defaults():
43744374
"newrelic.hooks.application_celery",
43754375
"instrument_celery_app_task",
43764376
)
4377+
_process_module_definition(
4378+
"celery.app.trace",
4379+
"newrelic.hooks.application_celery",
4380+
"instrument_celery_app_trace",
4381+
)
43774382
_process_module_definition("celery.worker", "newrelic.hooks.application_celery", "instrument_celery_worker")
43784383
_process_module_definition(
43794384
"celery.concurrency.processes",

newrelic/hooks/application_celery.py

+41-10
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,18 @@
2828
from newrelic.api.message_trace import MessageTrace
2929
from newrelic.api.pre_function import wrap_pre_function
3030
from newrelic.api.transaction import current_transaction
31-
from newrelic.common.object_wrapper import FunctionWrapper, wrap_function_wrapper
31+
from newrelic.common.object_wrapper import FunctionWrapper, wrap_function_wrapper, _NRBoundFunctionWrapper
3232
from newrelic.core.agent import shutdown_agent
3333

3434
UNKNOWN_TASK_NAME = "<Unknown Task>"
3535
MAPPING_TASK_NAMES = {"celery.starmap", "celery.map"}
3636

3737

38-
def task_name(*args, **kwargs):
38+
def task_info(instance, *args, **kwargs):
3939
# Grab the current task, which can be located in either place
40-
if args:
40+
if instance:
41+
task = instance
42+
elif args:
4143
task = args[0]
4244
elif "task" in kwargs:
4345
task = kwargs["task"]
@@ -46,27 +48,27 @@ def task_name(*args, **kwargs):
4648

4749
# Task can be either a task instance or a signature, which subclasses dict, or an actual dict in some cases.
4850
task_name = getattr(task, "name", None) or task.get("task", UNKNOWN_TASK_NAME)
51+
task_source = task
4952

5053
# Under mapping tasks, the root task name isn't descriptive enough so we append the
5154
# subtask name to differentiate between different mapping tasks
5255
if task_name in MAPPING_TASK_NAMES:
5356
try:
5457
subtask = kwargs["task"]["task"]
5558
task_name = "/".join((task_name, subtask))
59+
task_source = task.app._tasks[subtask]
5660
except Exception:
5761
pass
5862

59-
return task_name
63+
return task_name, task_source
6064

6165

6266
def CeleryTaskWrapper(wrapped):
6367
def wrapper(wrapped, instance, args, kwargs):
6468
transaction = current_transaction(active_only=False)
6569

66-
if instance is not None:
67-
_name = task_name(instance, *args, **kwargs)
68-
else:
69-
_name = task_name(*args, **kwargs)
70+
# Grab task name and source
71+
_name, _source = task_info(instance, *args, **kwargs)
7072

7173
# A Celery Task can be called either outside of a transaction, or
7274
# within the context of an existing transaction. There are 3
@@ -93,11 +95,11 @@ def wrapper(wrapped, instance, args, kwargs):
9395
return wrapped(*args, **kwargs)
9496

9597
elif transaction:
96-
with FunctionTrace(_name, source=instance):
98+
with FunctionTrace(_name, source=_source):
9799
return wrapped(*args, **kwargs)
98100

99101
else:
100-
with BackgroundTask(application_instance(), _name, "Celery", source=instance) as transaction:
102+
with BackgroundTask(application_instance(), _name, "Celery", source=_source) as transaction:
101103
# Attempt to grab distributed tracing headers
102104
try:
103105
# Headers on earlier versions of Celery may end up as attributes
@@ -200,6 +202,26 @@ def wrap_Celery_send_task(wrapped, instance, args, kwargs):
200202
return wrapped(*args, **kwargs)
201203

202204

205+
def wrap_worker_optimizations(wrapped, instance, args, kwargs):
206+
# Attempt to uninstrument BaseTask before stack protection is installed or uninstalled
207+
try:
208+
from celery.app.task import BaseTask
209+
210+
if isinstance(BaseTask.__call__, _NRBoundFunctionWrapper):
211+
BaseTask.__call__ = BaseTask.__call__.__wrapped__
212+
except Exception:
213+
BaseTask = None
214+
215+
# Allow metaprogramming to run
216+
result = wrapped(*args, **kwargs)
217+
218+
# Rewrap finalized BaseTask
219+
if BaseTask: # Ensure imports succeeded
220+
BaseTask.__call__ = CeleryTaskWrapper(BaseTask.__call__)
221+
222+
return result
223+
224+
203225
def instrument_celery_app_base(module):
204226
if hasattr(module, "Celery") and hasattr(module.Celery, "send_task"):
205227
wrap_function_wrapper(module, "Celery.send_task", wrap_Celery_send_task)
@@ -239,3 +261,12 @@ def force_agent_shutdown(*args, **kwargs):
239261

240262
if hasattr(module, "Worker"):
241263
wrap_pre_function(module, "Worker._do_exit", force_agent_shutdown)
264+
265+
266+
def instrument_celery_app_trace(module):
267+
# Uses same wrapper for setup and reset worker optimizations to prevent patching and unpatching from removing wrappers
268+
if hasattr(module, "setup_worker_optimizations"):
269+
wrap_function_wrapper(module, "setup_worker_optimizations", wrap_worker_optimizations)
270+
271+
if hasattr(module, "reset_worker_optimizations"):
272+
wrap_function_wrapper(module, "reset_worker_optimizations", wrap_worker_optimizations)

tests/application_celery/test_task_methods.py

+35-14
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,32 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import pytest
1516

1617
from _target_application import add, tsum
17-
from celery import chain, chord, group
18+
from testing_support.validators.validate_code_level_metrics import (
19+
validate_code_level_metrics,
20+
)
1821
from testing_support.validators.validate_transaction_count import (
1922
validate_transaction_count,
2023
)
2124
from testing_support.validators.validate_transaction_metrics import (
2225
validate_transaction_metrics,
2326
)
2427

25-
FORGONE_TASK_METRICS = [("Function/_target_application.add", None), ("Function/_target_application.tsum", None)]
28+
import celery
2629

2730

28-
def test_task_wrapping_detection():
29-
"""
30-
Ensure celery detects our monkeypatching properly and will run our instrumentation
31-
on __call__ and runs that instead of micro-optimizing it away to a run() call.
31+
FORGONE_TASK_METRICS = [("Function/_target_application.add", None), ("Function/_target_application.tsum", None)]
3232

33-
If this is not working, most other tests in this file will fail as the different ways
34-
of running celery tasks will not all run our instrumentation.
35-
"""
36-
from celery.app.trace import task_has_custom
3733

38-
assert task_has_custom(add, "__call__")
34+
@pytest.fixture(scope="module", autouse=True, params=[False, True], ids=["unpatched", "patched"])
35+
def with_worker_optimizations(request, celery_worker_available):
36+
if request.param:
37+
celery.app.trace.setup_worker_optimizations(celery_worker_available.app)
38+
39+
yield request.param
40+
celery.app.trace.reset_worker_optimizations()
3941

4042

4143
@validate_transaction_metrics(
@@ -45,6 +47,7 @@ def test_task_wrapping_detection():
4547
rollup_metrics=FORGONE_TASK_METRICS,
4648
background_task=True,
4749
)
50+
@validate_code_level_metrics("_target_application", "add")
4851
@validate_transaction_count(1)
4952
def test_celery_task_call():
5053
"""
@@ -61,6 +64,7 @@ def test_celery_task_call():
6164
rollup_metrics=FORGONE_TASK_METRICS,
6265
background_task=True,
6366
)
67+
@validate_code_level_metrics("_target_application", "add")
6468
@validate_transaction_count(1)
6569
def test_celery_task_apply():
6670
"""
@@ -78,6 +82,7 @@ def test_celery_task_apply():
7882
rollup_metrics=FORGONE_TASK_METRICS,
7983
background_task=True,
8084
)
85+
@validate_code_level_metrics("_target_application", "add")
8186
@validate_transaction_count(1)
8287
def test_celery_task_delay():
8388
"""
@@ -95,6 +100,7 @@ def test_celery_task_delay():
95100
rollup_metrics=FORGONE_TASK_METRICS,
96101
background_task=True,
97102
)
103+
@validate_code_level_metrics("_target_application", "add")
98104
@validate_transaction_count(1)
99105
def test_celery_task_apply_async():
100106
"""
@@ -112,6 +118,7 @@ def test_celery_task_apply_async():
112118
rollup_metrics=FORGONE_TASK_METRICS,
113119
background_task=True,
114120
)
121+
@validate_code_level_metrics("_target_application", "add")
115122
@validate_transaction_count(1)
116123
def test_celery_app_send_task(celery_session_app):
117124
"""
@@ -129,6 +136,7 @@ def test_celery_app_send_task(celery_session_app):
129136
rollup_metrics=FORGONE_TASK_METRICS,
130137
background_task=True,
131138
)
139+
@validate_code_level_metrics("_target_application", "add")
132140
@validate_transaction_count(1)
133141
def test_celery_task_signature():
134142
"""
@@ -154,6 +162,8 @@ def test_celery_task_signature():
154162
background_task=True,
155163
index=-2,
156164
)
165+
@validate_code_level_metrics("_target_application", "add")
166+
@validate_code_level_metrics("_target_application", "add", index=-2)
157167
@validate_transaction_count(2)
158168
def test_celery_task_link():
159169
"""
@@ -179,12 +189,14 @@ def test_celery_task_link():
179189
background_task=True,
180190
index=-2,
181191
)
192+
@validate_code_level_metrics("_target_application", "add")
193+
@validate_code_level_metrics("_target_application", "add", index=-2)
182194
@validate_transaction_count(2)
183195
def test_celery_chain():
184196
"""
185197
Executes multiple tasks on worker process and returns an AsyncResult.
186198
"""
187-
result = chain(add.s(3, 4), add.s(5))()
199+
result = celery.chain(add.s(3, 4), add.s(5))()
188200

189201
result = result.get()
190202
assert result == 12
@@ -205,12 +217,14 @@ def test_celery_chain():
205217
background_task=True,
206218
index=-2,
207219
)
220+
@validate_code_level_metrics("_target_application", "add")
221+
@validate_code_level_metrics("_target_application", "add", index=-2)
208222
@validate_transaction_count(2)
209223
def test_celery_group():
210224
"""
211225
Executes multiple tasks on worker process and returns an AsyncResult.
212226
"""
213-
result = group(add.s(3, 4), add.s(1, 2))()
227+
result = celery.group(add.s(3, 4), add.s(1, 2))()
214228
result = result.get()
215229
assert result == [7, 3]
216230

@@ -238,12 +252,15 @@ def test_celery_group():
238252
background_task=True,
239253
index=-3,
240254
)
255+
@validate_code_level_metrics("_target_application", "tsum")
256+
@validate_code_level_metrics("_target_application", "add", index=-2)
257+
@validate_code_level_metrics("_target_application", "add", index=-3)
241258
@validate_transaction_count(3)
242259
def test_celery_chord():
243260
"""
244261
Executes 2 add tasks, followed by a tsum task on the worker process and returns an AsyncResult.
245262
"""
246-
result = chord([add.s(3, 4), add.s(1, 2)])(tsum.s())
263+
result = celery.chord([add.s(3, 4), add.s(1, 2)])(tsum.s())
247264
result = result.get()
248265
assert result == 10
249266

@@ -255,6 +272,7 @@ def test_celery_chord():
255272
rollup_metrics=[("Function/_target_application.tsum", 2)],
256273
background_task=True,
257274
)
275+
@validate_code_level_metrics("_target_application", "tsum", count=3)
258276
@validate_transaction_count(1)
259277
def test_celery_task_map():
260278
"""
@@ -272,6 +290,7 @@ def test_celery_task_map():
272290
rollup_metrics=[("Function/_target_application.add", 2)],
273291
background_task=True,
274292
)
293+
@validate_code_level_metrics("_target_application", "add", count=3)
275294
@validate_transaction_count(1)
276295
def test_celery_task_starmap():
277296
"""
@@ -297,6 +316,8 @@ def test_celery_task_starmap():
297316
background_task=True,
298317
index=-2,
299318
)
319+
@validate_code_level_metrics("_target_application", "add", count=2)
320+
@validate_code_level_metrics("_target_application", "add", count=2, index=-2)
300321
@validate_transaction_count(2)
301322
def test_celery_task_chunks():
302323
"""
+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Copyright 2010 New Relic, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from _target_application import add
16+
17+
import celery
18+
19+
from newrelic.common.object_wrapper import _NRBoundFunctionWrapper
20+
21+
22+
FORGONE_TASK_METRICS = [("Function/_target_application.add", None), ("Function/_target_application.tsum", None)]
23+
24+
25+
def test_task_wrapping_detection():
26+
"""
27+
Ensure celery detects our monkeypatching properly and will run our instrumentation
28+
on __call__ and runs that instead of micro-optimizing it away to a run() call.
29+
30+
If this is not working, most other tests in this file will fail as the different ways
31+
of running celery tasks will not all run our instrumentation.
32+
"""
33+
assert celery.app.trace.task_has_custom(add, "__call__")
34+
35+
36+
def test_worker_optimizations_preserve_instrumentation(celery_worker_available):
37+
is_instrumented = lambda: isinstance(celery.app.task.BaseTask.__call__, _NRBoundFunctionWrapper)
38+
39+
celery.app.trace.reset_worker_optimizations()
40+
assert is_instrumented(), "Instrumentation not initially applied."
41+
42+
celery.app.trace.setup_worker_optimizations(celery_worker_available.app)
43+
assert is_instrumented(), "setup_worker_optimizations removed instrumentation."
44+
45+
celery.app.trace.reset_worker_optimizations()
46+
assert is_instrumented(), "reset_worker_optimizations removed instrumentation."

tests/testing_support/validators/validate_transaction_count.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,22 @@
1717

1818

1919
def validate_transaction_count(count):
20-
_transactions = []
20+
transactions = []
2121

2222
@transient_function_wrapper('newrelic.core.stats_engine',
2323
'StatsEngine.record_transaction')
2424
def _increment_count(wrapped, instance, args, kwargs):
25-
_transactions.append(getattr(args[0], "name", True))
25+
transactions.append(getattr(args[0], "name", True))
2626
return wrapped(*args, **kwargs)
2727

2828
@function_wrapper
2929
def _validate_transaction_count(wrapped, instance, args, kwargs):
3030
_new_wrapped = _increment_count(wrapped)
3131
result = _new_wrapped(*args, **kwargs)
32+
33+
_transactions = list(transactions)
34+
del transactions[:] # Clear list for subsequent test runs
35+
3236
assert count == len(_transactions), (count, len(_transactions), _transactions)
3337

3438
return result

0 commit comments

Comments
 (0)