Skip to content

Commit 6872298

Browse files
authored
Fix infinite tracing spec compliance (#430)
* Add proper retry policy to streaming rpc * Working on removing grpc channel spam * Clean up logging * [Mega-Linter] Apply linters fixes * Bump Tests * Disable failing tests * Remove gRPC pin Co-authored-by: TimPansino <[email protected]>
1 parent 1f82ad8 commit 6872298

File tree

3 files changed

+64
-24
lines changed

3 files changed

+64
-24
lines changed

newrelic/core/agent_streaming.py

+60-22
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
try:
1919
import grpc
20-
from newrelic.core.infinite_tracing_pb2 import Span, RecordStatus
20+
21+
from newrelic.core.infinite_tracing_pb2 import RecordStatus, Span
2122
except ImportError:
2223
grpc = None
2324

@@ -33,25 +34,39 @@ class StreamingRpc(object):
3334
"""
3435

3536
PATH = "/com.newrelic.trace.v1.IngestService/RecordSpan"
37+
RETRY_POLICY = (
38+
(15, False),
39+
(15, False),
40+
(30, False),
41+
(60, False),
42+
(120, False),
43+
(300, True),
44+
)
45+
OPTIONS = [("grpc.enable_retries", 0)]
3646

3747
def __init__(self, endpoint, stream_buffer, metadata, record_metric, ssl=True):
38-
if ssl:
39-
credentials = grpc.ssl_channel_credentials()
40-
channel = grpc.secure_channel(endpoint, credentials)
41-
else:
42-
channel = grpc.insecure_channel(endpoint)
43-
self.channel = channel
48+
self._endpoint = endpoint
49+
self._ssl = ssl
4450
self.metadata = metadata
4551
self.request_iterator = stream_buffer
4652
self.response_processing_thread = threading.Thread(
4753
target=self.process_responses, name="NR-StreamingRpc-process-responses"
4854
)
4955
self.response_processing_thread.daemon = True
5056
self.notify = self.condition()
51-
self.rpc = self.channel.stream_stream(
52-
self.PATH, Span.SerializeToString, RecordStatus.FromString
53-
)
5457
self.record_metric = record_metric
58+
self.closed = False
59+
60+
self.create_channel()
61+
62+
def create_channel(self):
63+
if self._ssl:
64+
credentials = grpc.ssl_channel_credentials()
65+
self.channel = grpc.secure_channel(self._endpoint, credentials, options=self.OPTIONS)
66+
else:
67+
self.channel = grpc.insecure_channel(self._endpoint, options=self.OPTIONS)
68+
69+
self.rpc = self.channel.stream_stream(self.PATH, Span.SerializeToString, RecordStatus.FromString)
5570

5671
@staticmethod
5772
def condition(*args, **kwargs):
@@ -63,6 +78,7 @@ def close(self):
6378
if self.channel:
6479
channel = self.channel
6580
self.channel = None
81+
self.closed = True
6682
self.notify.notify_all()
6783

6884
if channel:
@@ -80,6 +96,7 @@ def connect(self):
8096
def process_responses(self):
8197
response_iterator = None
8298

99+
retry = 0
83100
while True:
84101
with self.notify:
85102
if self.channel and response_iterator:
@@ -112,21 +129,42 @@ def process_responses(self):
112129
)
113130
break
114131

115-
_logger.warning(
116-
"Streaming RPC closed. "
117-
"Will attempt to reconnect in 15 seconds. "
118-
"Code: %s Details: %s",
119-
code,
120-
details,
121-
)
122-
self.notify.wait(15)
132+
# Unpack retry policy settings
133+
if retry >= len(self.RETRY_POLICY):
134+
retry_time, error = self.RETRY_POLICY[-1]
135+
else:
136+
retry_time, error = self.RETRY_POLICY[retry]
137+
retry += 1
138+
139+
# Emit appropriate retry logs
140+
if not error:
141+
_logger.warning(
142+
"Streaming RPC closed. Will attempt to reconnect in %d seconds. Check the prior log entries and remedy any issue as necessary, or if the problem persists, report this problem to New Relic support for further investigation. Code: %s Details: %s",
143+
retry_time,
144+
code,
145+
details,
146+
)
147+
else:
148+
_logger.error(
149+
"Streaming RPC closed after additional attempts. Will attempt to reconnect in %d seconds. Please report this problem to New Relic support for further investigation. Code: %s Details: %s",
150+
retry_time,
151+
code,
152+
details,
153+
)
154+
155+
# Reconnect channel with backoff
156+
self.channel.close()
157+
self.notify.wait(retry_time)
158+
if self.closed:
159+
break
160+
else:
161+
_logger.debug("Attempting to reconnect Streaming RPC.")
162+
self.create_channel()
123163

124-
if not self.channel:
164+
if self.closed:
125165
break
126166

127-
response_iterator = self.rpc(
128-
self.request_iterator, metadata=self.metadata
129-
)
167+
response_iterator = self.rpc(self.request_iterator, metadata=self.metadata)
130168
_logger.info("Streaming RPC connect completed.")
131169

132170
try:

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def build_extension(self, ext):
156156
"newrelic": ["newrelic.ini", "version.txt", "packages/urllib3/LICENSE.txt", "common/cacert.pem"],
157157
},
158158
scripts=["scripts/newrelic-admin"],
159-
extras_require={"infinite-tracing": ["grpcio<1.40", "protobuf<4"]},
159+
extras_require={"infinite-tracing": ["grpcio", "protobuf<4"]},
160160
)
161161

162162
if with_setuptools:

tox.ini

+3-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ envlist =
114114
python-framework_fastapi-{py36,py37,py38,py39,py310},
115115
python-framework_flask-{pypy,py27}-flask0012,
116116
python-framework_flask-{pypy,py27,py36,py37,py38,py39,py310,pypy3}-flask0101,
117-
python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest,master},
117+
;temporarily disabling tests on flask master
118+
; python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest,master},
119+
python-framework_flask-{py37,py38,py39,py310,pypy3}-flask{latest},
118120
python-framework_graphene-{py27,py36,py37,py38,py39,py310,pypy,pypy3}-graphenelatest,
119121
python-framework_graphene-py37-graphene{0200,0201},
120122
python-framework_graphql-{py27,py36,py37,py38,py39,py310,pypy,pypy3}-graphql02,

0 commit comments

Comments
 (0)