Skip to content

Commit f26c09a

Browse files
fix(tracer): avoid flushing traces from the child in case of fork (#18262) (#18646)
Fixes duplicate trace emission after `fork()` when a trace is already buffered in the parent writer. My understanding of the problem is that in the child after a fork, `tracer._child_after_fork` recreates the writer and in the process properly shuts down its worker thread which calls the `on_shutdown` callback that flushes the traces from the child. The same traces are also emitted periodically from the parent and it causes the double trace emission problem. A solution is to skip restarting the worker thread in the writer. This way when the tracer is recreated in `tracer._child_after_fork` the worker thread is not stopped a second time and does not flush traces. When the thread is stopped for the first time in `PeriodicThread_after_fork`, the `_skip_shutdown` parameter is set to `false` so flushing does not occur yet. We noticed the problem while introducing Stripe tests in the system-tests "DEFAULT" scenario. The `stripe` SDK forks on requests and this causes a duplication of a few traces preceding that stripe test. This is caught by an integrity test check that runs on all traces during the scenario. The exact problematic lines in the stripe SDK are: https://github.com/stripe/stripe-python/blob/v12.5.1/stripe/_api_requestor.py#L475 In general `subprocess.Popen` (used to exec `uname` here) should perform an `os.posix_spawn` and not trigger fork hooks. However, gevent patches `subprocess.Popen` to use `os.fork()` instead. - Added a regression test (cherry picked from commit aa02e52) Co-authored-by: Florentin Labelle <florentin.labelle@datadoghq.com>
1 parent 0bae574 commit f26c09a

5 files changed

Lines changed: 131 additions & 2 deletions

ddtrace/internal/writer/writer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def __init__(
225225
processing_interval = config._trace_writer_interval_seconds
226226
if timeout is None:
227227
timeout = agent_config.trace_agent_timeout_seconds
228-
super(HTTPWriter, self).__init__(interval=processing_interval)
228+
super(HTTPWriter, self).__init__(interval=processing_interval, autorestart=False)
229229
self.intake_url = intake_url
230230
self._intake_accepts_gzip = use_gzip
231231
self._buffer_size = buffer_size
@@ -707,7 +707,7 @@ def __init__(
707707
if "X-Datadog-Test-Session-Token" in additional_header:
708708
test_session_token = additional_header["X-Datadog-Test-Session-Token"]
709709

710-
super(NativeWriter, self).__init__(interval=processing_interval)
710+
super(NativeWriter, self).__init__(interval=processing_interval, autorestart=False)
711711
self.intake_url = intake_url
712712
self._otlp_endpoint = otlp_endpoint
713713
self._buffer_size = buffer_size
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
tracing: This fix resolves an issue where traces buffered before ``os.fork()`` could be sent twice, once by the parent and once by the child.

tests/integration/test_integration_snapshots.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,3 +501,44 @@ def test_signal_shutdown_flushes_traces(
501501
if proc.poll() is None:
502502
proc.kill()
503503
proc.wait()
504+
505+
506+
@pytest.mark.subprocess(err=None, env={"DD_TRACE_WRITER_INTERVAL_SECONDS": "30"})
507+
@pytest.mark.snapshot()
508+
def test_buffered_trace_not_duplicated_across_fork():
509+
"""A trace buffered in the parent before fork() must be sent exactly once."""
510+
import os
511+
512+
from ddtrace.trace import tracer
513+
514+
with tracer.trace("buffered-before-fork", service="fork-test"):
515+
pass
516+
517+
pid = os.fork()
518+
if pid == 0:
519+
os._exit(0)
520+
521+
os.waitpid(pid, 0)
522+
tracer.flush()
523+
524+
525+
@pytest.mark.subprocess(err=None, env={"DD_TRACE_WRITER_INTERVAL_SECONDS": "30"})
526+
@pytest.mark.snapshot()
527+
def test_writer_restarts_in_child_and_flushes_traces_after_fork():
528+
"""After fork the child can create and flush its own traces while the parent trace is sent exactly once."""
529+
import os
530+
531+
from ddtrace.trace import tracer
532+
533+
with tracer.trace("parent-span", service="fork-test"):
534+
pass
535+
536+
pid = os.fork()
537+
if pid == 0:
538+
with tracer.trace("child-span", service="fork-test"):
539+
pass
540+
tracer.flush()
541+
os._exit(0)
542+
543+
os.waitpid(pid, 0)
544+
tracer.flush()
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[[
2+
{
3+
"name": "buffered-before-fork",
4+
"service": "fork-test",
5+
"resource": "buffered-before-fork",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "",
10+
"error": 0,
11+
"meta": {
12+
"_dd.base_service": "ddtrace_subprocess_dir",
13+
"_dd.p.dm": "-0",
14+
"_dd.p.tid": "6a27e37000000000",
15+
"_dd.svc_src": "m",
16+
"_dd.tags.process": "entrypoint.basedir:ddtrace_subprocess_dir,entrypoint.name:tmpfe9lhcj6,entrypoint.type:script,entrypoint.workdir:dd-trace-py,svc.auto:ddtrace_subprocess_dir",
17+
"language": "python",
18+
"runtime-id": "47d2ebc065d54d29881077fd2cb54d21"
19+
},
20+
"metrics": {
21+
"_dd.top_level": 1.0,
22+
"_dd.tracer_kr": 1.0,
23+
"_sampling_priority_v1": 1.0,
24+
"process_id": 91747.0
25+
},
26+
"duration": 2233000,
27+
"start": 1780999024913878000
28+
}]]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
[[
2+
{
3+
"name": "parent-span",
4+
"service": "fork-test",
5+
"resource": "parent-span",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "",
10+
"error": 0,
11+
"meta": {
12+
"_dd.base_service": "ddtrace_subprocess_dir",
13+
"_dd.p.dm": "-0",
14+
"_dd.p.tid": "6a27e35500000000",
15+
"_dd.svc_src": "m",
16+
"_dd.tags.process": "entrypoint.basedir:ddtrace_subprocess_dir,entrypoint.name:tmpuq52vj3g,entrypoint.type:script,entrypoint.workdir:dd-trace-py,svc.auto:ddtrace_subprocess_dir",
17+
"language": "python",
18+
"runtime-id": "e9137f90938e47d99d45971937f4020d"
19+
},
20+
"metrics": {
21+
"_dd.top_level": 1.0,
22+
"_dd.tracer_kr": 1.0,
23+
"_sampling_priority_v1": 1.0,
24+
"process_id": 91153.0
25+
},
26+
"duration": 2218000,
27+
"start": 1780998997002290000
28+
}],
29+
[
30+
{
31+
"name": "child-span",
32+
"service": "fork-test",
33+
"resource": "child-span",
34+
"trace_id": 1,
35+
"span_id": 1,
36+
"parent_id": 0,
37+
"type": "",
38+
"error": 0,
39+
"meta": {
40+
"_dd.base_service": "ddtrace_subprocess_dir",
41+
"_dd.p.dm": "-0",
42+
"_dd.p.tid": "6a27e35500000000",
43+
"_dd.svc_src": "m",
44+
"_dd.tags.process": "entrypoint.basedir:ddtrace_subprocess_dir,entrypoint.name:tmpuq52vj3g,entrypoint.type:script,entrypoint.workdir:dd-trace-py,svc.auto:ddtrace_subprocess_dir",
45+
"language": "python",
46+
"runtime-id": "53202310d1fd4f50a4eaa411ae87fa56"
47+
},
48+
"metrics": {
49+
"_dd.top_level": 1.0,
50+
"_dd.tracer_kr": 1.0,
51+
"_sampling_priority_v1": 1.0,
52+
"process_id": 91156.0
53+
},
54+
"duration": 95000,
55+
"start": 1780998997098421000
56+
}]]

0 commit comments

Comments
 (0)