@@ -10,28 +10,43 @@ import (
10
10
"errors"
11
11
"fmt"
12
12
"io"
13
+ "net/http"
13
14
"reflect"
14
15
"slices"
16
+ "strings"
15
17
"sync"
16
18
"sync/atomic"
17
19
"testing"
18
20
"time"
19
21
20
22
"github.com/gogo/protobuf/jsonpb"
21
23
"github.com/stretchr/testify/assert"
24
+ "github.com/stretchr/testify/mock"
22
25
"github.com/stretchr/testify/require"
26
+ "go.opentelemetry.io/collector/config/configgrpc"
27
+ "go.opentelemetry.io/collector/config/confighttp"
28
+ "go.opentelemetry.io/collector/config/confignet"
29
+ "go.opentelemetry.io/collector/pdata/ptrace"
30
+ otlptrace "go.opentelemetry.io/proto/otlp/collector/trace/v1"
31
+ tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
23
32
"go.uber.org/zap"
33
+ "go.uber.org/zap/zaptest"
34
+ "google.golang.org/grpc"
35
+ "google.golang.org/grpc/credentials/insecure"
36
+ "google.golang.org/grpc/metadata"
24
37
25
38
"github.com/jaegertracing/jaeger-idl/model/v1"
26
39
"github.com/jaegertracing/jaeger-idl/thrift-gen/jaeger"
27
40
zc "github.com/jaegertracing/jaeger-idl/thrift-gen/zipkincore"
41
+ cFlags "github.com/jaegertracing/jaeger/cmd/collector/app/flags"
28
42
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
29
43
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
30
44
zipkinsanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
31
45
"github.com/jaegertracing/jaeger/internal/metricstest"
32
46
"github.com/jaegertracing/jaeger/pkg/metrics"
33
47
"github.com/jaegertracing/jaeger/pkg/tenancy"
34
48
"github.com/jaegertracing/jaeger/pkg/testutils"
49
+ "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks"
35
50
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
36
51
)
37
52
@@ -807,3 +822,213 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) {
807
822
require .EqualError (t , err , processor .ErrBusy .Error ())
808
823
assert .Equal (t , []string {"op3" }, droppedOperations )
809
824
}
825
+
826
+ func optionsWithPorts (portHttp string , portGrpc string ) * cFlags.CollectorOptions {
827
+ opts := & cFlags.CollectorOptions {
828
+ OTLP : struct {
829
+ Enabled bool
830
+ GRPC configgrpc.ServerConfig
831
+ HTTP confighttp.ServerConfig
832
+ }{
833
+ Enabled : true ,
834
+ HTTP : confighttp.ServerConfig {
835
+ Endpoint : portHttp ,
836
+ IncludeMetadata : true ,
837
+ },
838
+ GRPC : configgrpc.ServerConfig {
839
+ NetAddr : confignet.AddrConfig {
840
+ Endpoint : portGrpc ,
841
+ Transport : confignet .TransportTypeTCP ,
842
+ },
843
+ },
844
+ },
845
+ }
846
+ return opts
847
+ }
848
+
849
+ func TestOTLPReceiverWithV2Storage (t * testing.T ) {
850
+ tests := []struct {
851
+ name string
852
+ requestType string
853
+ tenant string
854
+ expectedTenant string
855
+ expectedError bool
856
+ executeRequest func (ctx context.Context , url string , tenant string ) error
857
+ }{
858
+ {
859
+ name : "Valid tenant via HTTP" ,
860
+ requestType : "http" ,
861
+ tenant : "test-tenant" ,
862
+ expectedTenant : "test-tenant" ,
863
+ expectedError : false ,
864
+ executeRequest : sendHTTPRequest ,
865
+ },
866
+ {
867
+ name : "Invalid tenant via HTTP" ,
868
+ requestType : "http" ,
869
+ tenant : "invalid-tenant" ,
870
+ expectedTenant : "" ,
871
+ expectedError : true ,
872
+ executeRequest : sendHTTPRequest ,
873
+ },
874
+ {
875
+ name : "Valid tenant via gRPC" ,
876
+ requestType : "grpc" ,
877
+ tenant : "test-tenant" ,
878
+ expectedTenant : "test-tenant" ,
879
+ expectedError : false ,
880
+ executeRequest : sendGRPCRequest ,
881
+ },
882
+ {
883
+ name : "Invalid tenant via gRPC" ,
884
+ requestType : "grpc" ,
885
+ tenant : "invalid-tenant" ,
886
+ expectedTenant : "" ,
887
+ expectedError : true ,
888
+ executeRequest : sendGRPCRequest ,
889
+ },
890
+ }
891
+
892
+ for _ , tt := range tests {
893
+ t .Run (tt .name , func (t * testing.T ) {
894
+ mockWriter := mocks .NewWriter (t )
895
+
896
+ spanProcessor , err := NewSpanProcessor (
897
+ mockWriter ,
898
+ nil ,
899
+ Options .NumWorkers (1 ),
900
+ Options .QueueSize (1 ),
901
+ Options .ReportBusy (true ),
902
+ )
903
+ require .NoError (t , err )
904
+ defer spanProcessor .Close ()
905
+ logger := zaptest .NewLogger (t )
906
+
907
+ portHttp := "4317"
908
+ portGrpc := "4318"
909
+
910
+ var receivedTraces atomic.Pointer [ptrace.Traces ]
911
+ var receivedCtx atomic.Pointer [context.Context ]
912
+ if ! tt .expectedError {
913
+ mockWriter .On ("WriteTraces" , mock .Anything , mock .Anything ).
914
+ Run (func (args mock.Arguments ) {
915
+ storeContext := args .Get (0 ).(context.Context )
916
+ storeTrace := args .Get (1 ).(ptrace.Traces )
917
+ receivedTraces .Store (& storeTrace )
918
+ receivedCtx .Store (& storeContext )
919
+ }).Return (nil )
920
+ }
921
+
922
+ tenancyMgr := tenancy .NewManager (& tenancy.Options {
923
+ Enabled : true ,
924
+ Header : "x-tenant" ,
925
+ Tenants : []string {"test-tenant" },
926
+ })
927
+
928
+ rec , err := handler .StartOTLPReceiver (
929
+ optionsWithPorts (fmt .Sprintf ("localhost:%v" , portHttp ), fmt .Sprintf ("localhost:%v" , portGrpc )),
930
+ logger ,
931
+ spanProcessor ,
932
+ tenancyMgr ,
933
+ )
934
+ require .NoError (t , err )
935
+ ctx := context .Background ()
936
+ defer rec .Shutdown (ctx )
937
+
938
+ var url string
939
+ if tt .requestType == "http" {
940
+ url = fmt .Sprintf ("http://localhost:%v/v1/traces" , portHttp )
941
+ } else {
942
+ url = fmt .Sprintf ("localhost:%v" , portGrpc )
943
+ }
944
+ err = tt .executeRequest (ctx , url , tt .tenant )
945
+ if tt .expectedError {
946
+ assert .Error (t , err )
947
+ return
948
+ }
949
+ require .NoError (t , err )
950
+
951
+ assert .Eventually (t , func () bool {
952
+ storedTraces := receivedTraces .Load ()
953
+ storedCtx := receivedCtx .Load ()
954
+ if storedTraces == nil || storedCtx == nil {
955
+ return false
956
+ }
957
+ receivedSpan := storedTraces .ResourceSpans ().At (0 ).
958
+ ScopeSpans ().At (0 ).
959
+ Spans ().At (0 )
960
+ receivedTenant := tenancy .GetTenant (* storedCtx )
961
+ return receivedSpan .Name () == "test-trace" && receivedTenant == tt .expectedTenant
962
+ }, 1 * time .Second , 100 * time .Millisecond )
963
+
964
+ mockWriter .AssertExpectations (t )
965
+ })
966
+ }
967
+ }
968
+
969
+ // Helper function to send HTTP request
970
+ func sendHTTPRequest (ctx context.Context , url string , tenant string ) error {
971
+ traceJSON := `{
972
+ "resourceSpans": [{
973
+ "scopeSpans": [{
974
+ "spans": [{
975
+ "name": "test-trace"
976
+ }]
977
+ }]
978
+ }]
979
+ }`
980
+
981
+ req , err := http .NewRequestWithContext (ctx , http .MethodPost , url , strings .NewReader (traceJSON ))
982
+ if err != nil {
983
+ return err
984
+ }
985
+ req .Header .Set ("Content-Type" , "application/json" )
986
+ req .Header .Set ("x-tenant" , tenant )
987
+
988
+ resp , err := http .DefaultClient .Do (req )
989
+ if err != nil {
990
+ return err
991
+ }
992
+ defer resp .Body .Close ()
993
+ if resp .StatusCode != http .StatusOK {
994
+ return fmt .Errorf ("unexpected status code: %d" , resp .StatusCode )
995
+ }
996
+ return nil
997
+ }
998
+
999
+ // Helper function to send gRPC request
1000
+ func sendGRPCRequest (ctx context.Context , url string , tenant string ) error {
1001
+ conn , err := grpc .NewClient (
1002
+ url ,
1003
+ grpc .WithTransportCredentials (insecure .NewCredentials ()),
1004
+ )
1005
+ if err != nil {
1006
+ return err
1007
+ }
1008
+ defer conn .Close ()
1009
+
1010
+ md := metadata .New (map [string ]string {
1011
+ "x-tenant" : tenant ,
1012
+ })
1013
+ ctxWithMD := metadata .NewOutgoingContext (ctx , md )
1014
+
1015
+ client := otlptrace .NewTraceServiceClient (conn )
1016
+ req := & otlptrace.ExportTraceServiceRequest {
1017
+ ResourceSpans : []* tracepb.ResourceSpans {
1018
+ {
1019
+ ScopeSpans : []* tracepb.ScopeSpans {
1020
+ {
1021
+ Spans : []* tracepb.Span {
1022
+ {
1023
+ Name : "test-trace" ,
1024
+ },
1025
+ },
1026
+ },
1027
+ },
1028
+ },
1029
+ },
1030
+ }
1031
+
1032
+ _ , err = client .Export (ctxWithMD , req )
1033
+ return err
1034
+ }
0 commit comments