15
15
package exporter
16
16
17
17
import (
18
+ // standard libraries.
18
19
"context"
20
+ "os"
19
21
22
+ // third-party libraries.
20
23
v2 "github.com/cloudevents/sdk-go/v2"
21
24
"github.com/google/uuid"
22
- vanussdk "github.com/vanus-labs/sdk/golang"
23
- "github.com/vanus-labs/vanus/observability/log"
24
-
25
25
"go.opentelemetry.io/otel/attribute"
26
26
tracesdk "go.opentelemetry.io/otel/sdk/trace"
27
+ "google.golang.org/grpc/credentials/insecure"
28
+
29
+ // first-party libraries.
30
+ "github.com/vanus-labs/vanus/observability/log"
31
+ "github.com/vanus-labs/vanus/pkg/cluster"
32
+ "github.com/vanus-labs/vanus/proto/pkg/cloudevents"
33
+ "github.com/vanus-labs/vanus/proto/pkg/codec"
34
+
35
+ // this project.
36
+ "github.com/vanus-labs/vanus/client"
37
+ "github.com/vanus-labs/vanus/client/pkg/api"
27
38
)
28
39
40
+ func GetExporter (endpoints []string , eventbus string ) tracesdk.SpanExporter {
41
+ spanExporter , err := New (context .Background (), WithEndpoints (endpoints ), WithEventbus (eventbus ))
42
+ if err != nil {
43
+ log .Error ().Err (err ).Msg ("new span exporter failed" )
44
+ os .Exit (- 1 )
45
+ }
46
+ return spanExporter
47
+ }
48
+
29
49
type Option func (* Options )
30
50
31
51
type Options struct {
32
- Endpoints string
52
+ Endpoints [] string
33
53
Eventbus string
34
54
}
35
55
36
56
func defaultOptions () * Options {
37
57
return & Options {
38
- Endpoints : "127.0.0.1:8080" ,
58
+ Endpoints : [] string {} ,
39
59
Eventbus : "event-tracing" ,
40
60
}
41
61
}
42
62
43
- func WithEndpoint ( endpoint string ) Option {
63
+ func WithEndpoints ( endpoints [] string ) Option {
44
64
return func (options * Options ) {
45
- options .Endpoints = endpoint
65
+ options .Endpoints = endpoints
46
66
}
47
67
}
48
68
@@ -54,9 +74,8 @@ func WithEventbus(eventbus string) Option {
54
74
55
75
// Exporter exports trace data in the OTLP wire format.
56
76
type Exporter struct {
57
- endpoints string
58
- client vanussdk.Client
59
- publisher vanussdk.Publisher
77
+ endpoints []string
78
+ writer api.BusWriter
60
79
}
61
80
62
81
var _ tracesdk.SpanExporter = (* Exporter )(nil )
@@ -67,53 +86,58 @@ func New(ctx context.Context, opts ...Option) (*Exporter, error) {
67
86
apply (defaultOpts )
68
87
}
69
88
70
- clientOpts := & vanussdk.ClientOptions {
71
- Endpoint : defaultOpts .Endpoints ,
72
- Token : "admin" ,
89
+ ctrl := cluster .NewClusterController (defaultOpts .Endpoints , insecure .NewCredentials ())
90
+ if err := ctrl .WaitForControllerReady (true ); err != nil {
91
+ log .Error (ctx ).Err (err ).Msg ("wait for controller ready timeout" )
92
+ return nil , err
73
93
}
74
-
75
- c , err := vanussdk .Connect (clientOpts )
94
+ eventbus , err := ctrl .EventbusService ().GetEventbusByName (ctx , "default" , defaultOpts .Eventbus )
76
95
if err != nil {
77
- panic ("failed to connect to Vanus cluster, error: " + err .Error ())
96
+ log .Error (ctx ).Err (err ).Str ("eventbus" , defaultOpts .Eventbus ).Msg ("failed to get eventbus" )
97
+ return nil , err
78
98
}
79
99
80
- ebOpt := vanussdk .WithEventbus ("default" , defaultOpts .Eventbus )
100
+ c := client .Connect (defaultOpts .Endpoints )
101
+ bus := c .Eventbus (ctx , api .WithName (defaultOpts .Eventbus ), api .WithID (eventbus .Id ))
81
102
exporter := & Exporter {
82
103
endpoints : defaultOpts .Endpoints ,
83
- client : c ,
84
- publisher : c .Publisher (ebOpt ),
85
- }
86
- _ , err = c .Controller ().Eventbus ().Get (ctx , ebOpt )
87
- if err != nil {
88
- panic ("failed to get tracing eventbus, error: " + err .Error ())
104
+ writer : bus .Writer (),
89
105
}
90
106
return exporter , nil
91
107
}
92
108
93
109
// ExportSpans exports a batch of spans.
94
110
func (e * Exporter ) ExportSpans (ctx context.Context , ss []tracesdk.ReadOnlySpan ) error {
95
- es := make ([]* v2. Event , 0 )
111
+ ces := make ([]* cloudevents. CloudEvent , 0 )
96
112
for _ , span := range ss {
97
- if span .Name () != "EventTracing" {
113
+ event := newEvent (span )
114
+ if event .Type () != "event-tracing" {
98
115
continue
99
116
}
100
- event := newEvent (span )
101
- es = append (es , & event )
117
+ eventpb , err := codec .ToProto (& event )
118
+ if err != nil {
119
+ log .Error (ctx ).Err (err ).Any ("event" , event ).Msg ("failed to proto event" )
120
+ return nil
121
+ }
122
+ ces = append (ces , eventpb )
102
123
}
103
124
104
- if len (es ) == 0 {
125
+ if len (ces ) == 0 {
105
126
return nil
106
127
}
107
128
108
- err := e .publisher .Publish (ctx , es ... )
129
+ ceBatch := & cloudevents.CloudEventBatch {
130
+ Events : ces ,
131
+ }
132
+ _ , err := e .writer .Append (ctx , ceBatch )
109
133
if err != nil {
110
- log .Error (ctx ).Err (err ).Msg ("failed to publish events to tracing eventbus" )
134
+ log .Error (ctx ).Err (err ).Msg ("failed to append events to tracing eventbus" )
111
135
return nil
112
136
}
113
137
return nil
114
138
}
115
139
116
- // Shutdown flushes all exports and closes all connections to the receiving endpoint.
140
+ // Shutdown
117
141
func (e * Exporter ) Shutdown (ctx context.Context ) error {
118
142
return nil
119
143
}
@@ -122,7 +146,6 @@ func newEvent(span tracesdk.ReadOnlySpan) v2.Event {
122
146
event := v2 .NewEvent ()
123
147
event .SetID (uuid .New ().String ())
124
148
event .SetSource (span .Name ())
125
- event .SetType (span .SpanKind ().String ())
126
149
data := make (map [string ]interface {})
127
150
data ["name" ] = span .Name ()
128
151
data ["trace_id" ] = span .SpanContext ().TraceID ().String ()
@@ -136,6 +159,9 @@ func newEvent(span tracesdk.ReadOnlySpan) v2.Event {
136
159
data [string (attr .Key )] = attr .Value .AsInt64 ()
137
160
} else if attr .Value .Type () == attribute .STRING {
138
161
data [string (attr .Key )] = attr .Value .AsString ()
162
+ if string (attr .Key ) == "type" && attr .Value .AsString () == "event-tracing" {
163
+ event .SetType ("event-tracing" )
164
+ }
139
165
}
140
166
}
141
167
data ["events" ] = span .Events ()
0 commit comments