Skip to content

Commit ff58376

Browse files
committed
Propagate route harvest cancellation
Pass harvest contexts through Java
1 parent f0d0f4d commit ff58376

5 files changed

Lines changed: 168 additions & 37 deletions

File tree

pkg/internal/transform/route/harvest/harvester.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ package harvest // import "go.opentelemetry.io/obi/pkg/internal/transform/route/
55

66
import (
77
"context"
8+
"errors"
89
"log/slog"
910
"os"
1011
"path/filepath"
11-
"runtime"
1212
"strings"
1313
"sync"
1414
"time"
@@ -29,7 +29,7 @@ type RouteHarvester struct {
2929
mux *sync.Mutex
3030

3131
// testing related
32-
javaExtractRoutes func(pid app.PID) (*RouteHarvesterResult, error)
32+
javaExtractRoutes func(ctx context.Context, pid app.PID) (*RouteHarvesterResult, error)
3333
nodeExtractRoutes func(pid app.PID) (*RouteHarvesterResult, error)
3434
}
3535

@@ -74,7 +74,7 @@ func NewRouteHarvester(cfg *services.RouteHarvestingConfig, disabled []services.
7474
mux: &sync.Mutex{},
7575
}
7676

77-
h.javaExtractRoutes = h.java.ExtractRoutes
77+
h.javaExtractRoutes = h.java.ExtractRoutesContext
7878
h.nodeExtractRoutes = ExtractNodejsRoutes
7979

8080
return h
@@ -97,14 +97,6 @@ func (h *RouteHarvester) HarvestRoutes(fileInfo *exec.FileInfo) (*RouteHarvester
9797

9898
resultChan := make(chan result, 1)
9999

100-
// We need to fix this in the downstream library and then we can remove this code
101-
if fileInfo.SDKLanguage() == svc.InstrumentableJava {
102-
runtime.LockOSThread()
103-
defer runtime.UnlockOSThread()
104-
h.java.Attacher.Init()
105-
defer h.java.Attacher.Cleanup()
106-
}
107-
108100
// Run the harvesting in a goroutine
109101
go func() {
110102
defer func() {
@@ -117,7 +109,7 @@ func (h *RouteHarvester) HarvestRoutes(fileInfo *exec.FileInfo) (*RouteHarvester
117109
switch fileInfo.SDKLanguage() {
118110
case svc.InstrumentableJava:
119111
if _, ok := h.disabled[svc.InstrumentableJava]; !ok {
120-
r, err := h.javaExtractRoutes(fileInfo.Pid())
112+
r, err := h.javaExtractRoutes(ctx, fileInfo.Pid())
121113
if err != nil {
122114
resultChan <- result{err: err}
123115
return
@@ -147,6 +139,10 @@ func (h *RouteHarvester) HarvestRoutes(fileInfo *exec.FileInfo) (*RouteHarvester
147139
// Wait for either completion or timeout
148140
select {
149141
case result := <-resultChan:
142+
if errors.Is(result.err, context.DeadlineExceeded) {
143+
h.log.Warn("route harvesting timed out", "timeout", h.timeout, "pid", fileInfo.Pid())
144+
return nil, &HarvestError{Message: "route harvesting timed out"}
145+
}
150146
return result.r, result.err
151147
case <-ctx.Done():
152148
h.log.Warn("route harvesting timed out", "timeout", h.timeout, "pid", fileInfo.Pid())

pkg/internal/transform/route/harvest/harvester_test.go

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package harvest
55

66
import (
7+
"context"
78
"errors"
89
"os"
910
"path/filepath"
@@ -21,50 +22,58 @@ import (
2122
)
2223

2324
// successfulExtractRoutes simulates a successful route extraction
24-
func successfulExtractRoutes(app.PID) (*RouteHarvesterResult, error) {
25+
func successfulExtractRoutes(context.Context, app.PID) (*RouteHarvesterResult, error) {
2526
return &RouteHarvesterResult{
2627
Routes: []string{"/api/users", "/api/orders"},
2728
Kind: CompleteRoutes,
2829
}, nil
2930
}
3031

3132
// errorExtractRoutes simulates an error during route extraction
32-
func errorExtractRoutes(app.PID) (*RouteHarvesterResult, error) {
33+
func errorExtractRoutes(context.Context, app.PID) (*RouteHarvesterResult, error) {
3334
return nil, errors.New("failed to connect to Java process")
3435
}
3536

3637
// timeoutExtractRoutes simulates a slow operation that will timeout
37-
func timeoutExtractRoutes(app.PID) (*RouteHarvesterResult, error) {
38-
// Sleep longer than any reasonable timeout
39-
time.Sleep(5 * time.Second)
40-
return &RouteHarvesterResult{
41-
Routes: []string{"/api/delayed"},
42-
Kind: CompleteRoutes,
43-
}, nil
38+
func timeoutExtractRoutes(ctx context.Context, _ app.PID) (*RouteHarvesterResult, error) {
39+
<-ctx.Done()
40+
return nil, ctx.Err()
4441
}
4542

4643
// panicExtractRoutes simulates a panic during route extraction
47-
func panicExtractRoutes(app.PID) (*RouteHarvesterResult, error) {
44+
func panicExtractRoutes(context.Context, app.PID) (*RouteHarvesterResult, error) {
4845
panic("unexpected error in java route extraction")
4946
}
5047

5148
// slowButSuccessfulExtractRoutes simulates a slow but successful operation
52-
func slowButSuccessfulExtractRoutes(app.PID) (*RouteHarvesterResult, error) {
53-
time.Sleep(50 * time.Millisecond) // Slow but within timeout
49+
func slowButSuccessfulExtractRoutes(ctx context.Context, _ app.PID) (*RouteHarvesterResult, error) {
50+
select {
51+
case <-ctx.Done():
52+
return nil, ctx.Err()
53+
case <-time.After(50 * time.Millisecond): // Slow but within timeout
54+
}
5455
return &RouteHarvesterResult{
5556
Routes: []string{"/api/slow"},
5657
Kind: PartialRoutes,
5758
}, nil
5859
}
5960

6061
// emptyResultExtractRoutes simulates successful extraction with no routes
61-
func emptyResultExtractRoutes(app.PID) (*RouteHarvesterResult, error) {
62+
func emptyResultExtractRoutes(context.Context, app.PID) (*RouteHarvesterResult, error) {
6263
return &RouteHarvesterResult{
6364
Routes: []string{},
6465
Kind: CompleteRoutes,
6566
}, nil
6667
}
6768

69+
func successfulNodeExtractRoutes(pid app.PID) (*RouteHarvesterResult, error) {
70+
return successfulExtractRoutes(context.Background(), pid)
71+
}
72+
73+
func errorNodeExtractRoutes(pid app.PID) (*RouteHarvesterResult, error) {
74+
return errorExtractRoutes(context.Background(), pid)
75+
}
76+
6877
func createTestFileInfo(language svc.InstrumentableType) *exec.FileInfo {
6978
return exec.New(exec.Init{
7079
Pid: 12345,
@@ -172,7 +181,7 @@ func TestHarvestRoutes_EmptyResult(t *testing.T) {
172181
func TestHarvestRoutes_NonJavaLanguage(t *testing.T) {
173182
harvester := NewRouteHarvester(&services.RouteHarvestingConfig{}, []services.RouteHarvesterLanguage{}, 1*time.Second)
174183
// javaExtractRoutes should not be called for non-Java languages
175-
harvester.javaExtractRoutes = func(_ app.PID) (*RouteHarvesterResult, error) {
184+
harvester.javaExtractRoutes = func(_ context.Context, _ app.PID) (*RouteHarvesterResult, error) {
176185
t.Fatal("javaExtractRoutes should not be called for non-Java languages")
177186
return nil, nil
178187
}
@@ -187,7 +196,14 @@ func TestHarvestRoutes_NonJavaLanguage(t *testing.T) {
187196

188197
func TestHarvestRoutes_MultipleTimeouts(t *testing.T) {
189198
harvester := NewRouteHarvester(&services.RouteHarvestingConfig{}, []services.RouteHarvesterLanguage{}, 50*time.Millisecond)
190-
harvester.javaExtractRoutes = timeoutExtractRoutes
199+
200+
exited := make(chan struct{}, 3)
201+
harvester.javaExtractRoutes = func(ctx context.Context, pid app.PID) (*RouteHarvesterResult, error) {
202+
defer func() {
203+
exited <- struct{}{}
204+
}()
205+
return timeoutExtractRoutes(ctx, pid)
206+
}
191207

192208
fileInfo := createTestFileInfo(svc.InstrumentableJava)
193209

@@ -202,11 +218,15 @@ func TestHarvestRoutes_MultipleTimeouts(t *testing.T) {
202218
require.ErrorAs(t, err, &harvestErr, "iteration %d should return HarvestError", i)
203219
assert.Equal(t, "route harvesting timed out", harvestErr.Message, "iteration %d should have timeout message", i)
204220
}
221+
222+
require.Eventually(t, func() bool {
223+
return len(exited) == 3
224+
}, time.Second, time.Millisecond)
205225
}
206226

207227
func TestHarvestNodejsRoutes_Successful(t *testing.T) {
208228
harvester := NewRouteHarvester(&services.RouteHarvestingConfig{}, []services.RouteHarvesterLanguage{}, 1*time.Second)
209-
harvester.nodeExtractRoutes = successfulExtractRoutes
229+
harvester.nodeExtractRoutes = successfulNodeExtractRoutes
210230

211231
fileInfo := createTestFileInfo(svc.InstrumentableNodejs)
212232

@@ -220,7 +240,7 @@ func TestHarvestNodejsRoutes_Successful(t *testing.T) {
220240

221241
func TestHarvestNodejsRoutes_Error(t *testing.T) {
222242
harvester := NewRouteHarvester(&services.RouteHarvestingConfig{}, []services.RouteHarvesterLanguage{}, 1*time.Second)
223-
harvester.nodeExtractRoutes = errorExtractRoutes
243+
harvester.nodeExtractRoutes = errorNodeExtractRoutes
224244

225245
fileInfo := createTestFileInfo(svc.InstrumentableNodejs)
226246

pkg/internal/transform/route/harvest/java.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ package harvest // import "go.opentelemetry.io/obi/pkg/internal/transform/route/
88
import (
99
"bufio"
1010
"bytes"
11+
"context"
1112
"io"
1213
"log/slog"
1314
"net/url"
1415
"regexp"
16+
"runtime"
1517
"sort"
1618
"strings"
1719
"unicode"
@@ -28,7 +30,7 @@ type JavaRoutes struct {
2830
type JavaAttacher interface {
2931
Init()
3032
Cleanup()
31-
Attach(pid app.PID, argv []string, ignoreOnJ9 bool) (io.ReadCloser, error)
33+
AttachContext(ctx context.Context, pid app.PID, argv []string, ignoreOnJ9 bool) (io.ReadCloser, error)
3234
}
3335

3436
type RealJavaAttacher struct {
@@ -153,8 +155,19 @@ func (h *JavaRoutes) processSymbolLine(lineBytes []byte, routes []string) []stri
153155
}
154156

155157
func (h *JavaRoutes) ExtractRoutes(pid app.PID) (*RouteHarvesterResult, error) {
158+
return h.ExtractRoutesContext(context.Background(), pid)
159+
}
160+
161+
func (h *JavaRoutes) ExtractRoutesContext(ctx context.Context, pid app.PID) (*RouteHarvesterResult, error) {
156162
var routes []string
157-
out, err := h.Attacher.Attach(pid, []string{"jcmd", "VM.symboltable -verbose"}, true)
163+
164+
runtime.LockOSThread()
165+
defer runtime.UnlockOSThread()
166+
167+
h.Attacher.Init()
168+
defer h.Attacher.Cleanup()
169+
170+
out, err := h.Attacher.AttachContext(ctx, pid, []string{"jcmd", "VM.symboltable -verbose"}, true)
158171
if err != nil {
159172
return nil, err
160173
}
@@ -165,6 +178,10 @@ func (h *JavaRoutes) ExtractRoutes(pid app.PID) (*RouteHarvesterResult, error) {
165178
}
166179

167180
defer out.Close()
181+
stopClose := context.AfterFunc(ctx, func() {
182+
_ = out.Close()
183+
})
184+
defer stopClose()
168185

169186
reader := bufio.NewReader(out)
170187
for {
@@ -176,6 +193,9 @@ func (h *JavaRoutes) ExtractRoutes(pid app.PID) (*RouteHarvesterResult, error) {
176193
routes = h.processSymbolLine(line, routes)
177194
break
178195
}
196+
if ctxErr := ctx.Err(); ctxErr != nil {
197+
return nil, ctxErr
198+
}
179199
h.log.Error("error reading line", "error", err)
180200
return nil, err
181201
}
@@ -203,3 +223,7 @@ func (j RealJavaAttacher) Cleanup() {
203223
func (j RealJavaAttacher) Attach(pid app.PID, argv []string, ignoreOnJ9 bool) (io.ReadCloser, error) {
204224
return j.Attacher.Attach(int(pid), argv, ignoreOnJ9)
205225
}
226+
227+
func (j RealJavaAttacher) AttachContext(ctx context.Context, pid app.PID, argv []string, ignoreOnJ9 bool) (io.ReadCloser, error) {
228+
return j.Attacher.AttachContext(ctx, int(pid), argv, ignoreOnJ9)
229+
}

pkg/internal/transform/route/harvest/java_notlinux.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@
44
//go:build !linux
55

66
package harvest // import "go.opentelemetry.io/obi/pkg/internal/transform/route/harvest"
7-
import "go.opentelemetry.io/obi/pkg/appolly/app"
7+
8+
import (
9+
"context"
10+
11+
"go.opentelemetry.io/obi/pkg/appolly/app"
12+
)
813

914
type (
1015
JavaRoutes struct{ Attacher JavaAttacher }
@@ -17,7 +22,14 @@ type (
1722
func NewJavaRoutesHarvester() *JavaRoutes {
1823
return &JavaRoutes{Attacher: fakeAttacher{}}
1924
}
20-
func (h *JavaRoutes) ExtractRoutes(_ app.PID) (*RouteHarvesterResult, error) { return nil, nil }
25+
26+
func (h *JavaRoutes) ExtractRoutes(_ app.PID) (*RouteHarvesterResult, error) {
27+
return nil, nil
28+
}
29+
30+
func (h *JavaRoutes) ExtractRoutesContext(_ context.Context, _ app.PID) (*RouteHarvesterResult, error) {
31+
return nil, nil
32+
}
2133

2234
type fakeAttacher struct{}
2335

0 commit comments

Comments
 (0)