Skip to content

Commit c0a7f99

Browse files
authored
OTEL annotates gRPC responses (temporalio#7165)
## What changed? <!-- Describe what has changed in this PR --> Added support for our OTEL interceptor to parse tags from the gRPC response (not just the request). ## Why? <!-- Tell your future self why have you made these changes --> Without annotating the responses, traces from calls like `PollWorkflowTaskQueue` cannot be queried for since the ID is only on the response but not the request. ## How did you test it? <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> Verified in Tempo: <img width="1573" alt="image" src="https://github.com/user-attachments/assets/96a1f1b3-99f5-48f4-b603-b0b66f158538" /> ## Potential risks <!-- Assuming the worst case, what can be broken when deploying this change to production? --> There is one other caller of `Extract`, but they are only feeding in the request payload. So no behavior change is expected. ## Documentation <!-- Have you made sure this change doesn't falsify anything currently stated in `docs/`? If significant new behavior is added, have you described that in `docs/`? --> ## Is hotfix candidate? <!-- Is this PR a hotfix candidate or does it require a notification to be sent to the broader community? (Yes/No) -->
1 parent 519fb71 commit c0a7f99

File tree

7 files changed

+556
-71
lines changed

7 files changed

+556
-71
lines changed

cmd/tools/genrpcserverinterceptors/main.go

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ import (
4343
"go.temporal.io/server/api/matchingservice/v1"
4444
)
4545

46-
const maxRequestDepth = 5
46+
const maxMessageDepth = 5
4747

4848
type (
49-
requestData struct {
49+
messageData struct {
5050
Type string
5151

5252
WorkflowIdGetter string
@@ -57,7 +57,7 @@ type (
5757
grpcServerData struct {
5858
Server string
5959
Imports []string
60-
Requests []requestData
60+
Messages []messageData
6161
}
6262
)
6363

@@ -125,9 +125,9 @@ import (
125125
"go.temporal.io/server/common/log/tag"
126126
)
127127
128-
func (wt *WorkflowTags) extractFrom{{.Server}}Request(req any) []tag.Tag {
129-
switch r := req.(type) {
130-
{{- range .Requests}}
128+
func (wt *WorkflowTags) extractFrom{{.Server}}Message(message any) []tag.Tag {
129+
switch r := message.(type) {
130+
{{- range .Messages}}
131131
case {{.Type}}:
132132
{{- if or .TaskTokenGetter .WorkflowIdGetter .RunIdGetter}}
133133
{{- if .TaskTokenGetter}}
@@ -162,55 +162,60 @@ func writeGrpcServerData(w io.Writer, grpcServerT reflect.Type, tmpl string) {
162162
if rpcT.NumIn() < 2 {
163163
continue
164164
}
165-
requestT := rpcT.In(1) // Assume request is always the second parameter.
166165

167-
rd := workflowTagGetters(requestT, 0)
168-
rd.Type = requestT.String()
169-
sd.Requests = append(sd.Requests, rd)
166+
requestT := rpcT.In(1) // Assume request is always the second parameter.
167+
requestMd := workflowTagGetters(requestT, 0)
168+
requestMd.Type = requestT.String()
169+
sd.Messages = append(sd.Messages, requestMd)
170+
171+
respT := rpcT.Out(0) // Assume response is always the first parameter.
172+
responseMd := workflowTagGetters(respT, 0)
173+
responseMd.Type = respT.String()
174+
sd.Messages = append(sd.Messages, responseMd)
170175
}
171176

172177
fatalIfErr(template.Must(template.New("code").Parse(tmpl)).Execute(w, sd))
173178
}
174179

175180
//nolint:revive // cognitive complexity 37 (> max enabled 25)
176-
func workflowTagGetters(requestT reflect.Type, depth int) requestData {
177-
rd := requestData{}
178-
if depth > maxRequestDepth {
179-
return rd
181+
func workflowTagGetters(messageType reflect.Type, depth int) messageData {
182+
pd := messageData{}
183+
if depth > maxMessageDepth {
184+
return pd
180185
}
181186

182187
switch {
183-
case requestT.AssignableTo(executionGetterT):
184-
rd.WorkflowIdGetter = "GetExecution().GetWorkflowId()"
185-
rd.RunIdGetter = "GetExecution().GetRunId()"
186-
case requestT.AssignableTo(workflowExecutionGetterT):
187-
rd.WorkflowIdGetter = "GetWorkflowExecution().GetWorkflowId()"
188-
rd.RunIdGetter = "GetWorkflowExecution().GetRunId()"
189-
case requestT.AssignableTo(taskTokenGetterT):
188+
case messageType.AssignableTo(executionGetterT):
189+
pd.WorkflowIdGetter = "GetExecution().GetWorkflowId()"
190+
pd.RunIdGetter = "GetExecution().GetRunId()"
191+
case messageType.AssignableTo(workflowExecutionGetterT):
192+
pd.WorkflowIdGetter = "GetWorkflowExecution().GetWorkflowId()"
193+
pd.RunIdGetter = "GetWorkflowExecution().GetRunId()"
194+
case messageType.AssignableTo(taskTokenGetterT):
190195
for _, ert := range excludeTaskTokenTypes {
191-
if requestT.AssignableTo(ert) {
192-
return rd
196+
if messageType.AssignableTo(ert) {
197+
return pd
193198
}
194199
}
195-
rd.TaskTokenGetter = "GetTaskToken()"
200+
pd.TaskTokenGetter = "GetTaskToken()"
196201
default:
197202
// Might be one of these, both, or neither.
198-
if requestT.AssignableTo(workflowIdGetterT) {
199-
rd.WorkflowIdGetter = "GetWorkflowId()"
203+
if messageType.AssignableTo(workflowIdGetterT) {
204+
pd.WorkflowIdGetter = "GetWorkflowId()"
200205
}
201-
if requestT.AssignableTo(runIdGetterT) {
202-
rd.RunIdGetter = "GetRunId()"
206+
if messageType.AssignableTo(runIdGetterT) {
207+
pd.RunIdGetter = "GetRunId()"
203208
}
204209
}
205210

206211
// Iterates over fields in order they defined in proto file, not proto index.
207212
// Order is important because the first match wins.
208-
for fieldNum := 0; fieldNum < requestT.Elem().NumField(); fieldNum++ {
209-
if (rd.WorkflowIdGetter != "" && rd.RunIdGetter != "") || rd.TaskTokenGetter != "" {
213+
for fieldNum := 0; fieldNum < messageType.Elem().NumField(); fieldNum++ {
214+
if (pd.WorkflowIdGetter != "" && pd.RunIdGetter != "") || pd.TaskTokenGetter != "" {
210215
break
211216
}
212217

213-
nestedRequest := requestT.Elem().Field(fieldNum)
218+
nestedRequest := messageType.Elem().Field(fieldNum)
214219
if nestedRequest.Type.Kind() != reflect.Ptr {
215220
continue
216221
}
@@ -223,17 +228,17 @@ func workflowTagGetters(requestT reflect.Type, depth int) requestData {
223228

224229
nestedRd := workflowTagGetters(nestedRequest.Type, depth+1)
225230
// First match wins: if getter is already set, it won't be overwritten.
226-
if rd.WorkflowIdGetter == "" && nestedRd.WorkflowIdGetter != "" {
227-
rd.WorkflowIdGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.WorkflowIdGetter)
231+
if pd.WorkflowIdGetter == "" && nestedRd.WorkflowIdGetter != "" {
232+
pd.WorkflowIdGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.WorkflowIdGetter)
228233
}
229-
if rd.RunIdGetter == "" && nestedRd.RunIdGetter != "" {
230-
rd.RunIdGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.RunIdGetter)
234+
if pd.RunIdGetter == "" && nestedRd.RunIdGetter != "" {
235+
pd.RunIdGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.RunIdGetter)
231236
}
232-
if rd.TaskTokenGetter == "" && nestedRd.TaskTokenGetter != "" {
233-
rd.TaskTokenGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.TaskTokenGetter)
237+
if pd.TaskTokenGetter == "" && nestedRd.TaskTokenGetter != "" {
238+
pd.TaskTokenGetter = fmt.Sprintf("Get%s().%s", nestedRequest.Name, nestedRd.TaskTokenGetter)
234239
}
235240
}
236-
return rd
241+
return pd
237242
}
238243

239244
func callWithFile(generator func(io.Writer, reflect.Type), server reflect.Type, outPath string, licenseText string) {

common/rpc/interceptor/logtags/admin_service_server_gen.go

Lines changed: 86 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)