Skip to content

Commit 5a12f88

Browse files
authored
Feat celery plugins (#704)
* add py-standalone example * add py-immich example * add celery plugins * fix wrong spell * celery rabbitmq plugins
1 parent b675083 commit 5a12f88

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1573
-45
lines changed

plugins/PY/pinpointPy/Common.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
# Created by eeliu at 3/5/20
2121

22-
from pinpointPy import Defines, pinpoint, get_logger
22+
from pinpointPy import Defines, pinpoint
23+
from pinpointPy.pinpoint import get_logger
2324
from pinpointPy.TraceContext import get_trace_context
2425
from functools import wraps
2526

@@ -69,7 +70,7 @@ def pinpointTrace(*args, **kwargs):
6970

7071
class PinTrace:
7172

72-
def __init__(self, name):
73+
def __init__(self, name=''):
7374
self.name = name
7475

7576
def setCurrentTraceNodeId(self, traceId):
@@ -125,7 +126,10 @@ def pinpointTrace(*args, **kwargs):
125126
return pinpointTrace
126127

127128
def getUniqueName(self):
128-
return self.name
129+
if self.name:
130+
return self.name
131+
else:
132+
return self.func_name
129133

130134

131135
class AsyncPinTrace(PinTrace):
@@ -189,8 +193,10 @@ def __init__(self) -> None:
189193
self.ParentName = ''
190194
# REMOTE_ADDRESS field (the same as RemoteAddr)
191195
self.ParentHost = ''
192-
# create a new one, or use an exist parents Tid, this Tid used to generate call-tree
196+
# create a new one, or use an exist parents Tid, this Tid is used to generate call-tree
193197
self.ParentTid = ''
198+
# create a new one, or use an exist parent sequence id
199+
self.ParentSid = ''
194200
# if find a error, just fill here. it could rise an error(read mark) in pinpoint-web
195201
self.Error = ''
196202

@@ -292,13 +298,13 @@ def onBefore(self, parentId: int, *args, **kwargs):
292298
tid = ''
293299
if header.ParentTid != '':
294300
tid = header.ParentTid
295-
pinpoint.add_trace_header(Defines.PP_PARENT_SPAN_ID, tid, traceId)
301+
# pinpoint.add_trace_header(Defines.PP_PARENT_SPAN_ID, tid, traceId)
296302
else:
297303
tid = pinpoint.gen_tid()
298304

299-
pinpoint.add_trace_header(Defines.PP_TRANSCATION_ID, tid, traceId)
305+
pinpoint.add_trace_header(Defines.PP_TRANSACTION_ID, tid, traceId)
300306

301-
pinpoint.add_context(Defines.PP_TRANSCATION_ID, tid, traceId)
307+
pinpoint.add_context(Defines.PP_TRANSACTION_ID, tid, traceId)
302308

303309
if header.Error:
304310
pinpoint.mark_as_error(header.Error, header.Error, 0, traceId)

plugins/PY/pinpointPy/CommonPlugin.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def onBefore(self, parentId: int, *args, **kwargs):
7272

7373
sequence_id = pinpoint.get_sequence_id(traceId)
7474

75-
tid = pinpoint.get_context(Defines.PP_TRANSCATION_ID, traceId)
75+
tid = pinpoint.get_context(Defines.PP_TRANSACTION_ID, traceId)
7676
seq_id = pinpoint.get_context(Defines.PP_SPAN_ID, traceId)
7777
app_name = pinpoint.get_context(Defines.PP_APP_NAME, traceId)
7878
app_id = pinpoint.get_context(Defines.PP_APP_ID, traceId)
@@ -100,17 +100,16 @@ def pp_new_entry_func(*args, **kwargs):
100100
Defines.PP_SPAN_ID, seq_id, thread_trace_id)
101101

102102
pinpoint.add_trace_header(
103-
Defines.PP_TRANSCATION_ID, tid, thread_trace_id)
103+
Defines.PP_TRANSACTION_ID, tid, thread_trace_id)
104104
pinpoint.add_context(
105-
Defines.PP_TRANSCATION_ID, tid, thread_trace_id)
105+
Defines.PP_TRANSACTION_ID, tid, thread_trace_id)
106106

107107
pinpoint.add_trace_header(
108108
Defines.PP_SERVER_TYPE, Defines.PYTHON, thread_trace_id)
109109
pinpoint.set_async_context(
110110
thread_trace_id, async_id, sequence_id)
111111

112112
if callable(origin_target):
113-
# todo add
114113
@PinpointCommonPlugin(origin_target.__name__)
115114
def call_origin_target(*args, **kwargs):
116115
origin_target(*args, **kwargs)

plugins/PY/pinpointPy/Defines.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060

6161
PP_NGINX_PROXY = 'NP'
6262
PP_APACHE_PROXY = 'AP'
63-
PP_TRANSCATION_ID = 'tid'
63+
PP_TRANSACTION_ID = 'tid'
6464
PP_SPAN_ID = 'sid'
6565
PP_NOT_SAMPLED = 's0'
6666
PP_SAMPLED = 's1'
@@ -74,6 +74,7 @@
7474
PYTHON = '1700'
7575
PP_METHOD_CALL = '1701'
7676
PP_CELERY = '1702'
77+
PP_CELERY_WORKER = '1703'
7778

7879
PP_REMOTE_METHOD = '9900'
7980
P_INVOCATION_CALL_TYPE = '100'

plugins/PY/pinpointPy/Django/BaseDjangoRequestPlugins.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def onBefore(self, parentId, *args, **kwargs):
7575
self.tid = headers[Defines.PP_HEADER_PINPOINT_TRACEID]
7676
else:
7777
self.tid = pinpoint.gen_tid()
78-
pinpoint.add_context(Defines.PP_TRANSCATION_ID, self.tid, trace_id)
78+
pinpoint.add_context(Defines.PP_TRANSACTION_ID, self.tid, trace_id)
7979

8080
if Defines.PP_HTTP_PINPOINT_PAPPNAME in headers:
8181
self.pname = headers[Defines.PP_HTTP_PINPOINT_PAPPNAME]
@@ -133,9 +133,9 @@ def onBefore(self, parentId, *args, **kwargs):
133133
Defines.PP_HEADER_PINPOINT_SAMPLED, "s0", trace_id)
134134

135135
pinpoint.add_trace_header(
136-
Defines.PP_TRANSCATION_ID, self.tid, trace_id)
136+
Defines.PP_TRANSACTION_ID, self.tid, trace_id)
137137
pinpoint.add_trace_header(Defines.PP_SPAN_ID, self.sid, trace_id)
138-
pinpoint.add_context(Defines.PP_TRANSCATION_ID, self.tid, trace_id)
138+
pinpoint.add_context(Defines.PP_TRANSACTION_ID, self.tid, trace_id)
139139
pinpoint.add_context(Defines.PP_SPAN_ID, self.sid, trace_id)
140140
pinpoint.add_trace_header_v2(
141141
Defines.PP_HTTP_METHOD, headers["REQUEST_METHOD"], trace_id)

plugins/PY/pinpointPy/Fastapi/AsyRequestPlugin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ def onBefore(self, parentId, *args, **kwargs):
108108
pinpoint.drop_trace(traceId)
109109
pinpoint.add_context("Pinpoint-Sampled", "s0", traceId)
110110

111-
pinpoint.add_trace_header(Defines.PP_TRANSCATION_ID, tid, traceId)
112-
pinpoint.add_context(Defines.PP_TRANSCATION_ID, tid, traceId)
111+
pinpoint.add_trace_header(Defines.PP_TRANSACTION_ID, tid, traceId)
112+
pinpoint.add_context(Defines.PP_TRANSACTION_ID, tid, traceId)
113113
pinpoint.add_trace_header(Defines.PP_SPAN_ID, sid, traceId)
114114
pinpoint.add_context(Defines.PP_SPAN_ID, sid, traceId)
115115
return traceId, args, kwargs

plugins/PY/pinpointPy/Fastapi/PinTranscation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ def onBefore(self, parentId, *args, **kwargs):
108108
else:
109109
tid = pinpoint.gen_tid()
110110

111-
pinpoint.add_trace_header(Defines.PP_TRANSCATION_ID, tid, traceId)
112-
pinpoint.add_context(Defines.PP_TRANSCATION_ID, tid, traceId)
111+
pinpoint.add_trace_header(Defines.PP_TRANSACTION_ID, tid, traceId)
112+
pinpoint.add_context(Defines.PP_TRANSACTION_ID, tid, traceId)
113113

114114
if header.Error:
115115
pinpoint.mark_as_error(header.Error, header.Error, 0, traceId)

plugins/PY/pinpointPy/Flask/FlaskPlugins.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@ def onBefore(self, parentId, *args, **kwargs):
3838
pinpoint.add_trace_header(
3939
Defines.PP_APP_NAME, pinpoint.app_name(), traceId)
4040
pinpoint.add_context(Defines.PP_APP_NAME, pinpoint.app_name(), traceId)
41+
4142
pinpoint.add_trace_header(
4243
Defines.PP_APP_ID, pinpoint.app_id(), traceId)
44+
pinpoint.add_context(Defines.PP_APP_ID, pinpoint.app_id(), traceId)
45+
4346
###############################################################
4447
pinpoint.add_trace_header(
4548
Defines.PP_INTERCEPTOR_NAME, 'BaseFlaskrequest', traceId)
@@ -125,8 +128,8 @@ def onBefore(self, parentId, *args, **kwargs):
125128
pinpoint.add_context(
126129
Defines.PP_HEADER_PINPOINT_SAMPLED, "s0", traceId)
127130

128-
pinpoint.add_trace_header(Defines.PP_TRANSCATION_ID, self.tid, traceId)
129-
pinpoint.add_context(Defines.PP_TRANSCATION_ID, self.tid, traceId)
131+
pinpoint.add_trace_header(Defines.PP_TRANSACTION_ID, self.tid, traceId)
132+
pinpoint.add_context(Defines.PP_TRANSACTION_ID, self.tid, traceId)
130133
pinpoint.add_trace_header(Defines.PP_SPAN_ID, self.sid, traceId)
131134
pinpoint.add_context(Defines.PP_SPAN_ID, self.sid, traceId)
132135
pinpoint.add_trace_header_v2(
@@ -138,6 +141,6 @@ def onEnd(self, traceId, ret):
138141
super().onEnd(traceId, ret)
139142
return ret
140143

141-
def onException(self, e):
142-
pinpoint.mark_as_error(str(e), "", 0)
144+
def onException(self, traceId, e):
145+
pinpoint.mark_as_error(str(e), "", 0, traceId)
143146
raise e

plugins/PY/pinpointPy/Helper.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def generatePinpointHeader(host, headers, traceId=-1):
3434
headers['Pinpoint-Flags'] = "0"
3535
headers[Defines.PP_HEADER_PINPOINT_HOST] = host
3636
headers[Defines.PP_HEADER_PINPOINT_TRACEID] = pinpoint.get_context(
37-
Defines.PP_TRANSCATION_ID, traceId)
37+
Defines.PP_TRANSACTION_ID, traceId)
3838
headers[Defines.PP_HEADER_PINPOINT_PSPANID] = pinpoint.get_context(
3939
Defines.PP_SPAN_ID, traceId)
4040
nextSeqId = pinpoint.gen_sid()
@@ -128,8 +128,8 @@ def startPinpointByEnviron(environ, trace_id: int):
128128
pinpoint.add_context(
129129
Defines.PP_HEADER_PINPOINT_SAMPLED, "s0", trace_id)
130130

131-
pinpoint.add_trace_header(Defines.PP_TRANSCATION_ID, tid, trace_id)
132-
pinpoint.add_context(Defines.PP_TRANSCATION_ID, tid, trace_id)
131+
pinpoint.add_trace_header(Defines.PP_TRANSACTION_ID, tid, trace_id)
132+
pinpoint.add_context(Defines.PP_TRANSACTION_ID, tid, trace_id)
133133

134134
pinpoint.add_trace_header(Defines.PP_SPAN_ID, sid, trace_id)
135135
pinpoint.add_context(Defines.PP_SPAN_ID, sid, trace_id)

plugins/PY/pinpointPy/RequestPlugins.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def onBefore(self, parentId, *args, **kwargs):
7272
self.tid = request.headers[Defines.PP_HEADER_PINPOINT_TRACEID]
7373
else:
7474
self.tid = pinpoint.gen_tid()
75-
pinpoint.add_context(Defines.PP_TRANSCATION_ID, self.tid, trace_id)
75+
pinpoint.add_context(Defines.PP_TRANSACTION_ID, self.tid, trace_id)
7676

7777
if Defines.PP_HTTP_PINPOINT_PAPPNAME in request.headers:
7878
self.pname = request.headers[Defines.PP_HTTP_PINPOINT_PAPPNAME]
@@ -131,9 +131,9 @@ def onBefore(self, parentId, *args, **kwargs):
131131
Defines.PP_HEADER_PINPOINT_SAMPLED, "s0", trace_id)
132132

133133
pinpoint.add_trace_header(
134-
Defines.PP_TRANSCATION_ID, self.tid, trace_id)
134+
Defines.PP_TRANSACTION_ID, self.tid, trace_id)
135135
pinpoint.add_trace_header(Defines.PP_SPAN_ID, self.sid, trace_id)
136-
pinpoint.add_context(Defines.PP_TRANSCATION_ID, self.tid, trace_id)
136+
pinpoint.add_context(Defines.PP_TRANSACTION_ID, self.tid, trace_id)
137137
pinpoint.add_context(Defines.PP_SPAN_ID, self.sid, trace_id)
138138
return trace_id, args, kwargs
139139

plugins/PY/pinpointPy/grpc_/GrpcRequestPlugins.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ def onBefore(self, parentId, *args, **kwargs):
5151
self.sid = pinpoint.gen_sid()
5252
pinpoint.add_context(Defines.PP_SPAN_ID, self.sid, trace_id)
5353
self.tid = pinpoint.gen_tid()
54-
pinpoint.add_context(Defines.PP_TRANSCATION_ID, self.tid, trace_id)
54+
pinpoint.add_context(Defines.PP_TRANSACTION_ID, self.tid, trace_id)
5555
pinpoint.add_trace_header(
56-
Defines.PP_TRANSCATION_ID, self.tid, trace_id)
56+
Defines.PP_TRANSACTION_ID, self.tid, trace_id)
5757
pinpoint.add_trace_header(Defines.PP_SPAN_ID, self.sid, trace_id)
58-
pinpoint.add_context(Defines.PP_TRANSCATION_ID, self.tid, trace_id)
58+
pinpoint.add_context(Defines.PP_TRANSACTION_ID, self.tid, trace_id)
5959
pinpoint.add_context(Defines.PP_SPAN_ID, self.sid, trace_id)
6060
return trace_id, args, kwargs
6161

0 commit comments

Comments
 (0)