Skip to content

Commit 19471c4

Browse files
authored
Add async span API (#47)
1 parent adc5da2 commit 19471c4

File tree

10 files changed

+242
-22
lines changed

10 files changed

+242
-22
lines changed

docs/en/development-and-contribution/development-guide.md

+24
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,8 @@ After creating a Span, you can perform additional operations on it.
347347
```go
348348
// Span for plugin API
349349
type Span interface {
350+
// AsyncSpan for the async API
351+
AsyncSpan
350352
// Tag set the Tag of the Span
351353
Tag(Tag, string)
352354
// SetSpanLayer set the SpanLayer of the Span
@@ -364,6 +366,28 @@ type Span interface {
364366
}
365367
```
366368

369+
#### Async Span
370+
371+
There is a set of advanced APIs in Span which is specifically designed for async use cases.
372+
When setting name, tags, logs, and other operations (including end span) of the span in another goroutine, you should use these APIs.
373+
374+
```go
375+
type AsyncSpan interface {
376+
// PrepareAsync the span finished at current tracing context, but current span is still alive until AsyncFinish called
377+
PrepareAsync()
378+
// AsyncFinish to finished current async span
379+
AsyncFinish()
380+
}
381+
```
382+
383+
Following the previous API define, you should following these steps to use the async API:
384+
1. Call `span.PrepareAsync()` to prepare the span to do any operation in another goroutine.
385+
2. Use `Span.End()` in the original goroutine when your job in the current goroutine is complete.
386+
3. Propagate the span to any other goroutine in your plugin.
387+
4. Once the above steps are all set, call `span.AsyncFinish()` in any goroutine.
388+
5. When the `span.AsyncFinish()` is complete for all spans, the all spans would be finished and report to the backend.
389+
390+
367391
## Import Plugin
368392

369393
Once you have finished developing the plugin, you need to import the completed module into the Agent program and [define it in the corresponding file](../../../tools/go-agent/instrument/plugins/register.go).

plugins/core/span_default.go

+58-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package core
1919

2020
import (
2121
"math"
22+
"sync"
2223
"time"
2324

2425
"github.com/apache/skywalking-go/plugins/core/reporter"
@@ -41,6 +42,10 @@ type DefaultSpan struct {
4142
IsError bool
4243
SpanType SpanType
4344
Parent TracingSpan
45+
46+
InAsyncMode bool
47+
AsyncModeFinished bool
48+
AsyncOpLocker *sync.Mutex
4449
}
4550

4651
func NewDefaultSpan(tracer *Tracer, parent TracingSpan) *DefaultSpan {
@@ -54,6 +59,10 @@ func NewDefaultSpan(tracer *Tracer, parent TracingSpan) *DefaultSpan {
5459

5560
// For TracingSpan
5661
func (ds *DefaultSpan) SetOperationName(name string) {
62+
if ds.InAsyncMode {
63+
ds.AsyncOpLocker.Lock()
64+
defer ds.AsyncOpLocker.Unlock()
65+
}
5766
ds.OperationName = name
5867
}
5968

@@ -62,6 +71,10 @@ func (ds *DefaultSpan) GetOperationName() string {
6271
}
6372

6473
func (ds *DefaultSpan) SetPeer(peer string) {
74+
if ds.InAsyncMode {
75+
ds.AsyncOpLocker.Lock()
76+
defer ds.AsyncOpLocker.Unlock()
77+
}
6578
ds.Peer = peer
6679
}
6780

@@ -70,6 +83,10 @@ func (ds *DefaultSpan) GetPeer() string {
7083
}
7184

7285
func (ds *DefaultSpan) SetSpanLayer(layer int32) {
86+
if ds.InAsyncMode {
87+
ds.AsyncOpLocker.Lock()
88+
defer ds.AsyncOpLocker.Unlock()
89+
}
7390
ds.Layer = agentv3.SpanLayer(layer)
7491
}
7592

@@ -78,6 +95,10 @@ func (ds *DefaultSpan) GetSpanLayer() agentv3.SpanLayer {
7895
}
7996

8097
func (ds *DefaultSpan) SetComponent(componentID int32) {
98+
if ds.InAsyncMode {
99+
ds.AsyncOpLocker.Lock()
100+
defer ds.AsyncOpLocker.Unlock()
101+
}
81102
ds.ComponentID = componentID
82103
}
83104

@@ -86,6 +107,10 @@ func (ds *DefaultSpan) GetComponent() int32 {
86107
}
87108

88109
func (ds *DefaultSpan) Tag(key, value string) {
110+
if ds.InAsyncMode {
111+
ds.AsyncOpLocker.Lock()
112+
defer ds.AsyncOpLocker.Unlock()
113+
}
89114
for _, tag := range ds.Tags {
90115
if tag.Key == key {
91116
tag.Value = value
@@ -96,6 +121,10 @@ func (ds *DefaultSpan) Tag(key, value string) {
96121
}
97122

98123
func (ds *DefaultSpan) Log(ll ...string) {
124+
if ds.InAsyncMode {
125+
ds.AsyncOpLocker.Lock()
126+
defer ds.AsyncOpLocker.Unlock()
127+
}
99128
data := make([]*commonv3.KeyStringValuePair, 0, int32(math.Ceil(float64(len(ll))/2.0)))
100129
var kvp *commonv3.KeyStringValuePair
101130
for i, l := range ll {
@@ -111,14 +140,20 @@ func (ds *DefaultSpan) Log(ll ...string) {
111140
}
112141

113142
func (ds *DefaultSpan) Error(ll ...string) {
143+
if ds.InAsyncMode {
144+
ds.AsyncOpLocker.Lock()
145+
defer ds.AsyncOpLocker.Unlock()
146+
}
114147
ds.IsError = true
115148
ds.Log(ll...)
116149
}
117150

118-
func (ds *DefaultSpan) End() {
151+
func (ds *DefaultSpan) End(changeParent bool) {
119152
ds.EndTime = time.Now()
120-
if ctx := getTracingContext(); ctx != nil {
121-
ctx.SaveActiveSpan(ds.Parent)
153+
if changeParent {
154+
if ctx := getTracingContext(); ctx != nil {
155+
ctx.SaveActiveSpan(ds.Parent)
156+
}
122157
}
123158
}
124159

@@ -137,3 +172,23 @@ func (ds *DefaultSpan) IsValid() bool {
137172
func (ds *DefaultSpan) ParentSpan() TracingSpan {
138173
return ds.Parent
139174
}
175+
176+
func (ds *DefaultSpan) PrepareAsync() {
177+
if ds.InAsyncMode {
178+
panic("already in async mode")
179+
}
180+
ds.InAsyncMode = true
181+
ds.AsyncModeFinished = false
182+
ds.AsyncOpLocker = &sync.Mutex{}
183+
}
184+
185+
func (ds *DefaultSpan) AsyncFinish() {
186+
if !ds.InAsyncMode {
187+
panic("not in async mode")
188+
}
189+
if ds.AsyncModeFinished {
190+
panic("already finished async")
191+
}
192+
ds.AsyncModeFinished = true
193+
ds.AsyncOpLocker = nil
194+
}

plugins/core/span_noop.go

+6
Original file line numberDiff line numberDiff line change
@@ -93,3 +93,9 @@ func (*NoopSpan) IsValid() bool {
9393
func (n *NoopSpan) ParentSpan() TracingSpan {
9494
return nil
9595
}
96+
97+
func (n *NoopSpan) PrepareAsync() {
98+
}
99+
100+
func (n *NoopSpan) AsyncFinish() {
101+
}

plugins/core/span_tracing.go

+39-8
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,23 @@ func (s *SegmentSpanImpl) End() {
100100
if !s.IsValid() {
101101
return
102102
}
103-
s.DefaultSpan.End()
104-
go func() {
105-
s.SegmentContext.collect <- s
106-
}()
103+
s.DefaultSpan.End(true)
104+
s.end0()
107105
}
108106

107+
func (s *SegmentSpanImpl) AsyncFinish() {
108+
s.DefaultSpan.AsyncFinish()
109+
s.DefaultSpan.End(false)
110+
s.end0()
111+
}
112+
113+
func (s *SegmentSpanImpl) end0() {
114+
if !s.DefaultSpan.InAsyncMode || s.DefaultSpan.AsyncModeFinished {
115+
go func() {
116+
s.SegmentContext.collect <- s
117+
}()
118+
}
119+
}
109120
func (s *SegmentSpanImpl) GetDefaultSpan() *DefaultSpan {
110121
return &s.DefaultSpan
111122
}
@@ -220,10 +231,22 @@ func (rs *RootSegmentSpan) End() {
220231
if !rs.IsValid() {
221232
return
222233
}
223-
rs.DefaultSpan.End()
224-
go func() {
225-
rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1)
226-
}()
234+
rs.DefaultSpan.End(true)
235+
rs.end0()
236+
}
237+
238+
func (rs *RootSegmentSpan) AsyncFinish() {
239+
rs.DefaultSpan.AsyncFinish()
240+
rs.DefaultSpan.End(false)
241+
rs.end0()
242+
}
243+
244+
func (rs *RootSegmentSpan) end0() {
245+
if !rs.InAsyncMode || rs.AsyncModeFinished {
246+
go func() {
247+
rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1)
248+
}()
249+
}
227250
}
228251

229252
func (rs *RootSegmentSpan) createRootSegmentContext(ctx *TracingContext, _ SegmentSpan) (err error) {
@@ -295,6 +318,14 @@ func (s *SnapshotSpan) segmentRegister() bool {
295318
}
296319
}
297320

321+
func (s *SnapshotSpan) PrepareAsync() {
322+
panic("please use the PrepareAsync on right goroutine")
323+
}
324+
325+
func (s *SnapshotSpan) AsyncFinish() {
326+
panic("please use the AsyncFinish on right goroutine")
327+
}
328+
298329
func newSegmentRoot(segmentSpan *SegmentSpanImpl) *RootSegmentSpan {
299330
s := &RootSegmentSpan{
300331
SegmentSpanImpl: segmentSpan,

plugins/core/tracing.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"github.com/apache/skywalking-go/plugins/core/tracing"
2828
)
2929

30+
var snapshotType = reflect.TypeOf(&SnapshotSpan{})
31+
3032
func (t *Tracer) Tracing() interface{} {
3133
return t
3234
}
@@ -48,7 +50,7 @@ func (t *Tracer) CreateEntrySpan(operationName string, extractor interface{}, op
4850
saveSpanToActiveIfNotError(ctx, s, err)
4951
}()
5052
// if parent span is entry span, then use parent span as result
51-
if tracingSpan != nil && tracingSpan.IsEntry() {
53+
if tracingSpan != nil && tracingSpan.IsEntry() && reflect.ValueOf(tracingSpan).Type() != snapshotType {
5254
tracingSpan.SetOperationName(operationName)
5355
return tracingSpan, nil
5456
}
@@ -85,7 +87,7 @@ func (t *Tracer) CreateExitSpan(operationName, peer string, injector interface{}
8587
}()
8688

8789
// if parent span is exit span, then use parent span as result
88-
if tracingSpan != nil && tracingSpan.IsExit() {
90+
if tracingSpan != nil && tracingSpan.IsExit() && reflect.ValueOf(tracingSpan).Type() != snapshotType {
8991
return tracingSpan, nil
9092
}
9193
span, err := t.createSpan0(ctx, tracingSpan, opts, withSpanType(SpanTypeExit), withOperationName(operationName), withPeer(peer))

plugins/core/tracing/api.go

+4
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,7 @@ func (n *NoopSpan) Error(...string) {
188188
}
189189
func (n *NoopSpan) End() {
190190
}
191+
func (n *NoopSpan) PrepareAsync() {
192+
}
193+
func (n *NoopSpan) AsyncFinish() {
194+
}

plugins/core/tracing/bridge.go

+17
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,17 @@
1717

1818
package tracing
1919

20+
type AsyncSpan interface {
21+
// PrepareAsync the span finished at current tracing context, but current span is still alive until AsyncFinish called
22+
PrepareAsync()
23+
// AsyncFinish to finished current async span
24+
AsyncFinish()
25+
}
26+
2027
// AdaptSpan for adapt with agent core
2128
type AdaptSpan interface {
29+
AsyncSpan
30+
2231
GetTraceID() string
2332
GetSegmentID() string
2433
GetSpanID() int32
@@ -79,3 +88,11 @@ func (s *SpanWrapper) Error(v ...string) {
7988
func (s *SpanWrapper) End() {
8089
s.Span.End()
8190
}
91+
92+
func (s *SpanWrapper) PrepareAsync() {
93+
s.Span.PrepareAsync()
94+
}
95+
96+
func (s *SpanWrapper) AsyncFinish() {
97+
s.Span.AsyncFinish()
98+
}

plugins/core/tracing/span.go

+3
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ type InjectorWrapper interface {
104104

105105
// Span for plugin API
106106
type Span interface {
107+
// AsyncSpan Async API
108+
AsyncSpan
109+
107110
// TraceID of span
108111
TraceID() string
109112
// TraceSegmentID current segment ID of span

0 commit comments

Comments
 (0)