@@ -8,108 +8,157 @@ import (
88 "errors"
99
1010 "go.opentelemetry.io/collector/exporter/exporterhelper"
11+ "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sizer"
1112 "go.opentelemetry.io/collector/pdata/pprofile"
1213)
1314
1415// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig.
15- func (req * profilesRequest ) MergeSplit (_ context.Context , maxSize int , _ exporterhelper.RequestSizerType , r2 exporterhelper.Request ) ([]exporterhelper.Request , error ) {
16+ func (req * profilesRequest ) MergeSplit (_ context.Context , maxSize int , szt exporterhelper.RequestSizerType , r2 exporterhelper.Request ) ([]exporterhelper.Request , error ) {
17+ var sz sizer.ProfilesSizer
18+ switch szt {
19+ case exporterhelper .RequestSizerTypeItems :
20+ sz = & sizer.ProfilesCountSizer {}
21+ case exporterhelper .RequestSizerTypeBytes :
22+ sz = & sizer.ProfilesBytesSizer {}
23+ default :
24+ return nil , errors .New ("unknown sizer type" )
25+ }
26+
1627 if r2 != nil {
1728 req2 , ok := r2 .(* profilesRequest )
1829 if ! ok {
1930 return nil , errors .New ("invalid input type" )
2031 }
21- req2 .mergeTo (req )
32+ req2 .mergeTo (req , sz )
2233 }
2334
2435 // If no limit we can simply merge the new request into the current and return.
2536 if maxSize == 0 {
2637 return []exporterhelper.Request {req }, nil
2738 }
28- return req .split (maxSize )
39+ return req .split (maxSize , sz ), nil
2940}
3041
31- func (req * profilesRequest ) mergeTo (dst * profilesRequest ) {
32- dst .setCachedItemsCount (dst .ItemsCount () + req .ItemsCount ())
33- req .setCachedItemsCount (0 )
42+ func (req * profilesRequest ) mergeTo (dst * profilesRequest , sz sizer.ProfilesSizer ) {
43+ if sz != nil {
44+ dst .setCachedSize (dst .size (sz ) + req .size (sz ))
45+ req .setCachedSize (0 )
46+ }
3447 req .pd .ResourceProfiles ().MoveAndAppendTo (dst .pd .ResourceProfiles ())
3548}
3649
37- func (req * profilesRequest ) split (maxSize int ) ( []exporterhelper.Request , error ) {
50+ func (req * profilesRequest ) split (maxSize int , sz sizer. ProfilesSizer ) []exporterhelper.Request {
3851 var res []exporterhelper.Request
39- for req .ItemsCount () > maxSize {
40- pd := extractProfiles (req .pd , maxSize )
41- size := pd .SampleCount ()
42- req .setCachedItemsCount (req .ItemsCount () - size )
43- res = append (res , & profilesRequest {pd : pd , cachedItemsCount : size })
52+ for req .size (sz ) > maxSize {
53+ pd , rmSize := extractProfiles (req .pd , maxSize , sz )
54+ req .setCachedSize (req .size (sz ) - rmSize )
55+ res = append (res , newProfilesRequest (pd ))
4456 }
4557 res = append (res , req )
46- return res , nil
58+ return res
4759}
4860
4961// extractProfiles extracts a new profiles with a maximum number of samples.
50- func extractProfiles (srcProfiles pprofile.Profiles , count int ) pprofile.Profiles {
62+ func extractProfiles (srcProfiles pprofile.Profiles , capacity int , sz sizer. ProfilesSizer ) ( pprofile.Profiles , int ) {
5163 destProfiles := pprofile .NewProfiles ()
52- srcProfiles .ResourceProfiles ().RemoveIf (func (srcRS pprofile.ResourceProfiles ) bool {
53- if count == 0 {
64+ capacityLeft := capacity - sz .ProfilesSize (destProfiles )
65+ removedSize := 0
66+ srcProfiles .ResourceProfiles ().RemoveIf (func (srcRP pprofile.ResourceProfiles ) bool {
67+ // If the no more capacity left just return.
68+ if capacityLeft == 0 {
5469 return false
5570 }
56- needToExtract := samplesCount (srcRS ) > count
57- if needToExtract {
58- srcRS = extractResourceProfiles (srcRS , count )
71+ rawRpSize := sz .ResourceProfilesSize (srcRP )
72+ rpSize := sz .DeltaSize (rawRpSize )
73+
74+ if rpSize > capacityLeft {
75+ extSrcRP , extRpSize := extractResourceProfiles (srcRP , capacityLeft , sz )
76+ // This cannot make it to exactly 0 for the bytes,
77+ // force it to be 0 since that is the stopping condition.
78+ capacityLeft = 0
79+ removedSize += extRpSize
80+ // There represents the delta between the delta sizes.
81+ removedSize += rpSize - rawRpSize - (sz .DeltaSize (rawRpSize - extRpSize ) - (rawRpSize - extRpSize ))
82+ // It is possible that for the bytes scenario, the extracted field contains no profiles.
83+ // Do not add it to the destination if that is the case.
84+ if extSrcRP .ScopeProfiles ().Len () > 0 {
85+ extSrcRP .MoveTo (destProfiles .ResourceProfiles ().AppendEmpty ())
86+ }
87+ return extSrcRP .ScopeProfiles ().Len () != 0
5988 }
60- count -= samplesCount (srcRS )
61- srcRS .MoveTo (destProfiles .ResourceProfiles ().AppendEmpty ())
62- return ! needToExtract
89+ capacityLeft -= rpSize
90+ removedSize += rpSize
91+ srcRP .MoveTo (destProfiles .ResourceProfiles ().AppendEmpty ())
92+ return true
6393 })
64- return destProfiles
94+ return destProfiles , removedSize
6595}
6696
6797// extractResourceProfiles extracts profiles and returns a new resource profiles with the specified number of profiles.
68- func extractResourceProfiles (srcRS pprofile.ResourceProfiles , count int ) pprofile.ResourceProfiles {
69- destRS := pprofile .NewResourceProfiles ()
70- destRS .SetSchemaUrl (srcRS .SchemaUrl ())
71- srcRS .Resource ().CopyTo (destRS .Resource ())
72- srcRS .ScopeProfiles ().RemoveIf (func (srcSS pprofile.ScopeProfiles ) bool {
73- if count == 0 {
98+ func extractResourceProfiles (srcRP pprofile.ResourceProfiles , capacity int , sz sizer.ProfilesSizer ) (pprofile.ResourceProfiles , int ) {
99+ destRP := pprofile .NewResourceProfiles ()
100+ destRP .SetSchemaUrl (srcRP .SchemaUrl ())
101+ srcRP .Resource ().CopyTo (destRP .Resource ())
102+ // Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
103+ capacityLeft := capacity - (sz .DeltaSize (capacity ) - capacity ) - sz .ResourceProfilesSize (destRP )
104+ removedSize := 0
105+
106+ srcRP .ScopeProfiles ().RemoveIf (func (srcSS pprofile.ScopeProfiles ) bool {
107+ // If the no more capacity left just return.
108+ if capacityLeft == 0 {
74109 return false
75110 }
76- needToExtract := srcSS .Profiles ().Len () > count
77- if needToExtract {
78- srcSS = extractScopeProfiles (srcSS , count )
111+
112+ rawSlSize := sz .ScopeProfilesSize (srcSS )
113+ ssSize := sz .DeltaSize (rawSlSize )
114+ if ssSize > capacityLeft {
115+ extSrcSS , extSsSize := extractScopeProfiles (srcSS , capacityLeft , sz )
116+ // This cannot make it to exactly 0 for the bytes,
117+ // force it to be 0 since that is the stopping condition.
118+ capacityLeft = 0
119+ removedSize += extSsSize
120+ // There represents the delta between the delta sizes.
121+ removedSize += ssSize - rawSlSize - (sz .DeltaSize (rawSlSize - extSsSize ) - (rawSlSize - extSsSize ))
122+ // It is possible that for the bytes scenario, the extracted field contains no profiles.
123+ // Do not add it to the destination if that is the case.
124+ if extSrcSS .Profiles ().Len () > 0 {
125+ extSrcSS .MoveTo (destRP .ScopeProfiles ().AppendEmpty ())
126+ }
127+ return extSrcSS .Profiles ().Len () != 0
79128 }
80- count -= srcSS .Profiles ().Len ()
81- srcSS .MoveTo (destRS .ScopeProfiles ().AppendEmpty ())
82- return ! needToExtract
129+ capacityLeft -= ssSize
130+ removedSize += ssSize
131+ srcSS .MoveTo (destRP .ScopeProfiles ().AppendEmpty ())
132+ return true
83133 })
84- srcRS . Resource (). CopyTo ( destRS . Resource ())
85- return destRS
134+
135+ return destRP , removedSize
86136}
87137
88138// extractScopeProfiles extracts profiles and returns a new scope profiles with the specified number of profiles.
89- func extractScopeProfiles (srcSS pprofile.ScopeProfiles , count int ) pprofile.ScopeProfiles {
139+ func extractScopeProfiles (srcSS pprofile.ScopeProfiles , capacity int , sz sizer. ProfilesSizer ) ( pprofile.ScopeProfiles , int ) {
90140 destSS := pprofile .NewScopeProfiles ()
91141 destSS .SetSchemaUrl (srcSS .SchemaUrl ())
92142 srcSS .Scope ().CopyTo (destSS .Scope ())
143+ // Take into account that this can have max "capacity", so when added to the parent will need space for the extra delta size.
144+ capacityLeft := capacity - (sz .DeltaSize (capacity ) - capacity ) - sz .ScopeProfilesSize (destSS )
145+ removedSize := 0
93146 srcSS .Profiles ().RemoveIf (func (srcProfile pprofile.Profile ) bool {
94- if count == 0 {
147+ // If the no more capacity left just return.
148+ if capacityLeft == 0 {
149+ return false
150+ }
151+ rsSize := sz .DeltaSize (sz .ProfileSize (srcProfile ))
152+ if rsSize > capacityLeft {
153+ // This cannot make it to exactly 0 for the bytes,
154+ // force it to be 0 since that is the stopping condition.
155+ capacityLeft = 0
95156 return false
96157 }
158+ capacityLeft -= rsSize
159+ removedSize += rsSize
97160 srcProfile .MoveTo (destSS .Profiles ().AppendEmpty ())
98- count --
99161 return true
100162 })
101- return destSS
102- }
103-
104- // resourceProfilessCount calculates the total number of profiles in the pdata.ResourceProfiles.
105- func samplesCount (rs pprofile.ResourceProfiles ) int {
106- count := 0
107- rs .ScopeProfiles ().RemoveIf (func (ss pprofile.ScopeProfiles ) bool {
108- ss .Profiles ().RemoveIf (func (sp pprofile.Profile ) bool {
109- count += sp .Sample ().Len ()
110- return false
111- })
112- return false
113- })
114- return count
163+ return destSS , removedSize
115164}
0 commit comments