@@ -16,33 +16,48 @@ package exporter
16
16
17
17
import (
18
18
"context"
19
+ "os"
19
20
20
21
v2 "github.com/cloudevents/sdk-go/v2"
21
22
"github.com/google/uuid"
22
- vanussdk "github.com/vanus-labs/sdk/golang"
23
+ "github.com/vanus-labs/vanus/client"
24
+ "github.com/vanus-labs/vanus/client/pkg/api"
23
25
"github.com/vanus-labs/vanus/observability/log"
26
+ "github.com/vanus-labs/vanus/pkg/cluster"
27
+ "github.com/vanus-labs/vanus/proto/pkg/cloudevents"
28
+ "github.com/vanus-labs/vanus/proto/pkg/codec"
29
+ "google.golang.org/grpc/credentials/insecure"
24
30
25
31
"go.opentelemetry.io/otel/attribute"
26
32
tracesdk "go.opentelemetry.io/otel/sdk/trace"
27
33
)
28
34
35
+ func GetExporter (endpoints []string , eventbus string ) tracesdk.SpanExporter {
36
+ spanExporter , err := New (context .Background (), WithEndpoints (endpoints ), WithEventbus (eventbus ))
37
+ if err != nil {
38
+ log .Error ().Err (err ).Msg ("new span exporter failed" )
39
+ os .Exit (- 1 )
40
+ }
41
+ return spanExporter
42
+ }
43
+
29
44
type Option func (* Options )
30
45
31
46
type Options struct {
32
- Endpoints string
47
+ Endpoints [] string
33
48
Eventbus string
34
49
}
35
50
36
51
func defaultOptions () * Options {
37
52
return & Options {
38
- Endpoints : "127.0.0.1:8080" ,
53
+ Endpoints : [] string {} ,
39
54
Eventbus : "event-tracing" ,
40
55
}
41
56
}
42
57
43
- func WithEndpoint ( endpoint string ) Option {
58
+ func WithEndpoints ( endpoints [] string ) Option {
44
59
return func (options * Options ) {
45
- options .Endpoints = endpoint
60
+ options .Endpoints = endpoints
46
61
}
47
62
}
48
63
@@ -54,9 +69,8 @@ func WithEventbus(eventbus string) Option {
54
69
55
70
// Exporter exports trace data in the OTLP wire format.
56
71
type Exporter struct {
57
- endpoints string
58
- client vanussdk.Client
59
- publisher vanussdk.Publisher
72
+ endpoints []string
73
+ writer api.BusWriter
60
74
}
61
75
62
76
var _ tracesdk.SpanExporter = (* Exporter )(nil )
@@ -67,47 +81,52 @@ func New(ctx context.Context, opts ...Option) (*Exporter, error) {
67
81
apply (defaultOpts )
68
82
}
69
83
70
- clientOpts := & vanussdk.ClientOptions {
71
- Endpoint : defaultOpts .Endpoints ,
72
- Token : "admin" ,
84
+ ctrl := cluster .NewClusterController (defaultOpts .Endpoints , insecure .NewCredentials ())
85
+ if err := ctrl .WaitForControllerReady (true ); err != nil {
86
+ log .Error (ctx ).Err (err ).Msg ("wait for controller ready timeout" )
87
+ return nil , err
73
88
}
74
-
75
- c , err := vanussdk .Connect (clientOpts )
89
+ eventbus , err := ctrl .EventbusService ().GetEventbusByName (ctx , "default" , defaultOpts .Eventbus )
76
90
if err != nil {
77
- panic ("failed to connect to Vanus cluster, error: " + err .Error ())
91
+ log .Error (ctx ).Err (err ).Str ("eventbus" , defaultOpts .Eventbus ).Msg ("failed to get eventbus" )
92
+ return nil , err
78
93
}
79
94
80
- ebOpt := vanussdk .WithEventbus ("default" , defaultOpts .Eventbus )
95
+ c := client .Connect (defaultOpts .Endpoints )
96
+ bus := c .Eventbus (ctx , api .WithName (defaultOpts .Eventbus ), api .WithID (eventbus .Id ))
81
97
exporter := & Exporter {
82
98
endpoints : defaultOpts .Endpoints ,
83
- client : c ,
84
- publisher : c .Publisher (ebOpt ),
85
- }
86
- _ , err = c .Controller ().Eventbus ().Get (ctx , ebOpt )
87
- if err != nil {
88
- panic ("failed to get tracing eventbus, error: " + err .Error ())
99
+ writer : bus .Writer (),
89
100
}
90
101
return exporter , nil
91
102
}
92
103
93
104
// ExportSpans exports a batch of spans.
94
105
func (e * Exporter ) ExportSpans (ctx context.Context , ss []tracesdk.ReadOnlySpan ) error {
95
- es := make ([]* v2. Event , 0 )
106
+ ces := make ([]* cloudevents. CloudEvent , 0 )
96
107
for _ , span := range ss {
97
- if span .Name () != "EventTracing" {
108
+ event := newEvent (span )
109
+ if event .Type () != "event-tracing" {
98
110
continue
99
111
}
100
- event := newEvent (span )
101
- es = append (es , & event )
112
+ eventpb , err := codec .ToProto (& event )
113
+ if err != nil {
114
+ log .Error (ctx ).Err (err ).Any ("event" , event ).Msg ("failed to proto event" )
115
+ return nil
116
+ }
117
+ ces = append (ces , eventpb )
102
118
}
103
119
104
- if len (es ) == 0 {
120
+ if len (ces ) == 0 {
105
121
return nil
106
122
}
107
123
108
- err := e .publisher .Publish (ctx , es ... )
124
+ ceBatch := & cloudevents.CloudEventBatch {
125
+ Events : ces ,
126
+ }
127
+ _ , err := e .writer .Append (ctx , ceBatch )
109
128
if err != nil {
110
- log .Error (ctx ).Err (err ).Msg ("failed to publish events to tracing eventbus" )
129
+ log .Error (ctx ).Err (err ).Msg ("failed to append events to tracing eventbus" )
111
130
return nil
112
131
}
113
132
return nil
@@ -122,7 +141,6 @@ func newEvent(span tracesdk.ReadOnlySpan) v2.Event {
122
141
event := v2 .NewEvent ()
123
142
event .SetID (uuid .New ().String ())
124
143
event .SetSource (span .Name ())
125
- event .SetType (span .SpanKind ().String ())
126
144
data := make (map [string ]interface {})
127
145
data ["name" ] = span .Name ()
128
146
data ["trace_id" ] = span .SpanContext ().TraceID ().String ()
@@ -136,6 +154,9 @@ func newEvent(span tracesdk.ReadOnlySpan) v2.Event {
136
154
data [string (attr .Key )] = attr .Value .AsInt64 ()
137
155
} else if attr .Value .Type () == attribute .STRING {
138
156
data [string (attr .Key )] = attr .Value .AsString ()
157
+ if string (attr .Key ) == "type" && attr .Value .AsString () == "event-tracing" {
158
+ event .SetType ("event-tracing" )
159
+ }
139
160
}
140
161
}
141
162
data ["events" ] = span .Events ()
0 commit comments